https://github.com/returu/CityQuantHub
项目概述
https://lbs.amap.com/api/webservice/guide/api-advanced/search
-
一个key一个月只有5000次的免费配额; -
一次请求只能获取200个POI数据。

-
将大范围地理区域划分为多个小网格,然后对每个小网格分别请求POI数据,避免单次请求数据量过大; -
设置多个key,通过自动切换 API Key 以突破单个key的额度限制; -
每次尽量使用POI类型代码中的小类,例如餐饮服务(大类)→快餐厅(中类)→肯德基(小类),减少单次请求的数据量。
https://lbs.amap.com/
https://developer.amap.com/tools/picker
https://lbs.amap.com/api/webservice/download
代码解析
程序所需模块如下(仅需安装pandas这一第三方库,其余均为Python内置标准库):
import math
import json
import csv
import time
import os
import webbrowser
from urllib import request
import pandas as pd
-
1、RegionGridDivider类:区域网格化处理

-
divide_latitude方法:将纬度范围按照指定的网格大小进行划分,从北向南(纬度递减)划分网格直到覆盖整个区域; -
divide_longitude方法:将经度范围按照指定的网格大小进行划分,从西向东(经度递增)划分网格直到覆盖整个区域; -
generate_grid_coordinates方法:组合经纬度划分结果,生成所有小网格的坐标对,格式为”左上经度,左上纬度|右下经度,右下纬度”(高德API参数格式要求)。
# 大网格划分成小网格部分
class RegionGridDivider(object):
def __init__(self, bbox_str, grid_size):
"""
初始化区域分割器
参数:
bbox_str: 表示查询范围的坐标字符串,格式为"左上角坐标|右下角坐标"
例如: "121.457268,31.394788|122.012615,30.777097"
grid_size: 分割网格的大小,单位为度
"""
self.bbox_str = bbox_str
self.grid_size = grid_size
def divide_latitude(self):
"""
将查询区域的纬度范围按照网格大小进行划分
返回:
list: 从北到南的纬度坐标列表(降序排列)
"""
lat_max = float(self.bbox_str.split('|')[0].split(',')[1]) # 最大纬度
lat_min = float(self.bbox_str.split('|')[1].split(',')[1]) # 最小纬度
lat_list = [str(lat_max)]
# 从最大纬度逐步减去网格大小,直到小于最小纬度
while lat_max - lat_min > 0:
m = lat_max - self.grid_size
lat_max = lat_max - self.grid_size
lat_list.append("{:.2f}".format(m)) # 保留两位小数
return lat_list
def divide_longitude(self):
"""
将查询区域的经度范围按照网格大小进行划分
返回:
list: 从西到东的经度坐标列表(升序排列)
"""
lng_max = float(self.bbox_str.split('|')[1].split(',')[0]) # 最大经度
lng_min = float(self.bbox_str.split('|')[0].split(',')[0]) # 最小经度
lng_list = [str(lng_min)]
# 从最小经度逐步加上网格大小,直到大于最大经度
while lng_max - lng_min > 0:
m = lng_min + self.grid_size
lng_min = lng_min + self.grid_size
lng_list.append("{:.2f}".format(m)) # 保留两位小数
return sorted(lng_list) # 确保经度列表按升序排列
# 构建每个小网格的左上角(西北)和右下角(东南)的坐标对
def generate_grid_coordinates(self):
"""
生成所有小网格的坐标对
返回:
list: 每个元素为"左上角坐标|右下角坐标"格式的字符串
"""
lat = self.divide_latitude()
lng = self.divide_longitude()
ls = []
# 双重循环遍历所有经度和纬度的组合
for i in range(len(lng)-1):
for j in range(len(lat)-1):
# 左上角(西北)坐标
northwest = f"{lng[i]},{lat[j]}"
# 右下角(东南)坐标
southeast = f"{lng[i+1]},{lat[j+1]}"
coor = northwest + '|' + southeast # 组合成API所需的坐标格式
ls.append(coor)
return ls
-
2、CoordinateConverter类:坐标转换
# 构建坐标转换部分 - GCJ02(火星坐标系)与WGS84坐标系互转
class CoordinateConverter(object):
def __init__(self):
self.x_pi = 3.14159265358979324 * 3000.0 / 180.0
self.pi = 3.1415926535897932384626 # π
self.a = 6378245.0 # 长半轴
self.ee = 0.00669342162296594323 # 偏心率平方
def gcj02_to_wgs84(self, lng, lat):
"""
GCJ02(火星坐标系)转GPS84(世界标准坐标系)
参数:
lng: 经度(GCJ02)
lat: 纬度(GCJ02)
返回:
list: [经度(WGS84), 纬度(WGS84)]
"""
# 计算偏移量
dlat = self._transformlat(lng - 105.0, lat - 35.0)
dlng = self._transformlng(lng - 105.0, lat - 35.0)
# 坐标转换的数学计算
radlat = lat / 180.0 * self.pi
magic = math.sin(radlat)
magic = 1 - self.ee * magic * magic
sqrtmagic = math.sqrt(magic)
# 计算偏移修正量
dlat = (dlat * 180.0) / ((self.a * (1 - self.ee)) / (magic * sqrtmagic) * self.pi)
dlng = (dlng * 180.0) / (self.a / sqrtmagic * math.cos(radlat) * self.pi)
mglat = lat + dlat
mglng = lng + dlng
# 利用反向计算获取WGS-84坐标(通过对称原理)
return [lng * 2 - mglng, lat * 2 - mglat]
def _transformlat(self, lng, lat):
"""
纬度转换辅助函数 - 计算纬度偏移量
使用多项式和三角函数组合的经验公式
"""
ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat +
0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
ret += (20.0 * math.sin(6.0 * lng * self.pi) + 20.0 *
math.sin(2.0 * lng * self.pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lat * self.pi) + 40.0 *
math.sin(lat / 3.0 * self.pi)) * 2.0 / 3.0
ret += (160.0 * math.sin(lat / 12.0 * self.pi) + 320 *
math.sin(lat * self.pi / 30.0)) * 2.0 / 3.0
return ret
def _transformlng(self, lng, lat):
"""
经度转换辅助函数 - 计算经度偏移量
使用多项式和三角函数组合的经验公式
"""
ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng +
0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
ret += (20.0 * math.sin(6.0 * lng * self.pi) + 20.0 *
math.sin(2.0 * lng * self.pi)) * 2.0 / 3.0
ret += (20.0 * math.sin(lng * self.pi) + 40.0 *
math.sin(lng / 3.0 * self.pi)) * 2.0 / 3.0
ret += (150.0 * math.sin(lng / 12.0 * self.pi) + 300.0 *
math.sin(lng / 30.0 * self.pi)) * 2.0 / 3.0
return ret
-
3、GaodePoi类:API 请求与数据处理
-
switch_key方法:当当前 Key 失效或达到请求限制时,自动切换到下一个备用Key; -
log_message方法:记录日志信息,如果提供了log_callback函数,则调用该函数,否则直接打印到控制台; -
get_count方法:获取指定区域内某种类型 POI 的总数用于后续分页查询,并验证 Key 的有效性;
-
getPOIs方法:获取当前小网格内的所有POI数据,并将它们存储在一个生成器中。每页最多返回20个POI,通过循环请求所有页面来获取所有数据。对于每个POI,提取其ID、经纬度、名称、类型等信息,并以字典形式返回。
# 构建URL访问API部分
class GaodePoi(object):
def __init__(self, type_code, polygon, key_list, filename, log_callback=None):
"""
初始化高德地图POI数据获取器
参数:
type_code: POI类型代码,例如"050301"表示餐饮服务中快餐店中的肯德基
polygon: 查询区域的多边形坐标
key_list: API密钥列表,用于轮流使用以避免单个密钥请求超限
filename: 保存数据的CSV文件名
log_callback: 日志回调函数,用于输出运行信息
"""
self.type_code = type_code
self.polygon = polygon
self.key_list = key_list
self.current_key_index = 0
self.filename = filename
self.current_key = key_list[0] if key_list else None
self.log_callback = log_callback if log_callback elseprint
self.converter = CoordinateConverter() # 初始化坐标转换器
def switch_key(self):
"""切换到下一个API密钥"""
if not self.key_list:
raise ValueError("没有可用的key")
self.current_key_index = (self.current_key_index + 1) % len(self.key_list)
self.current_key = self.key_list[self.current_key_index]
self.log_message(f"已切换到备用key: {self.current_key[:5]}...")
time.sleep(2) # 切换key后等待2秒,避免立即请求失败
def log_message(self, message):
"""记录日志信息"""
if self.log_callback:
self.log_callback(message)
else:
print(message)
def get_count(self):
"""
获取指定区域内POI的数量并返回有效key
返回:
tuple: (POI数量, 当前有效key, 是否所有key都已尝试失败)
"""
max_attempts = len(self.key_list)
# 尝试使用所有可用key
for attempt in range(max_attempts):
try:
# 构建API请求URL,请求第一页数据以获取总数
url = f'https://restapi.amap.com/v3/place/polygon?key={self.current_key}&types={self.type_code}&polygon={self.polygon}&offset=20&page=1&extensions=all'
self.log_message(f"尝试使用key: {self.current_key[:5]}... 请求API...")
response = request.urlopen(url)
poi_json = json.load(response)
# 检查API返回状态
if poi_json['status'] == '0':
# 处理密钥失效或超限的情况
if poi_json['info'] in ('INVALID_USER_KEY', 'DAILY_QUERY_OVER_LIMIT'):
self.log_message(f"Key失效: {self.current_key[:5]}..., 错误信息: {poi_json['info']}")
if attempt == max_attempts - 1:
self.log_message("所有key都已尝试,仍然失败")
return 0, None, True # 第三个参数表示是否所有key都已尝试失败
self.switch_key()
continue
else:
self.log_message(f"API请求错误: {poi_json['info']}")
return 0, None, False
count = int(poi_json['count'])
self.log_message(f"当前使用key: {self.current_key[:5]}..., 状态: {poi_json['status']}, 找到 {count} 个POI")
time.sleep(1)
return count, self.current_key, False # 返回有效count和key
except Exception as e:
self.log_message(f"请求异常: {str(e)}")
if attempt < max_attempts - 1:
self.switch_key()
else:
self.log_message("所有key都已尝试,仍然失败")
return 0, None, True
return 0, None, True
# 根据输入的POI类型编码和城市编码,获取相应的POI数据并存储在文件中
def getPOIs(self):
"""
获取指定区域内的所有POI数据,返回一个生成器
返回:
generator: 每次产生一个POI的字典数据
"""
count, valid_key, all_failed = self.get_count()
if count == 0 or not valid_key or all_failed:
self.log_message("无数据或无有效key......")
return
# 计算总页数(每页20条数据)
# 官方文档强烈建议不超过25,若超过25可能造成访问报错
pages = count // 20 + 1
for page in range(1, pages+1):
try:
self.log_message(f'使用有效key: {valid_key[:5]}..., 正在获取第 {page}/{pages} 页数据')
url = f'https://restapi.amap.com/v3/place/polygon?key={valid_key}&types={self.type_code}&polygon={self.polygon}&offset=20&page={page}&extensions=all'
response = request.urlopen(url)
poi_json = json.load(response)
# 检查API返回状态
if poi_json['status'] == '0':
# 理论上这里不会出现key失效的情况,因为使用的是已经验证过的有效key
self.log_message(f"API请求错误: {poi_json['info']}")
break
pois = poi_json['pois']
for poi in pois:
# 提取需要的POI信息
result = {}
result["poi_id"] = poi['id'] # 唯一ID
# 获取原始坐标并转换为WGS84
lon_gcj02 = float(poi['location'].split(',')[0]) # 经度(GCJ02)
lat_gcj02 = float(poi['location'].split(',')[1]) # 纬度(GCJ02)
lon_wgs84, lat_wgs84 = self.converter.gcj02_to_wgs84(lon_gcj02, lat_gcj02)
result["lon"] = lon_wgs84 # 经度(WGS84)
result["lat"] = lat_wgs84 # 纬度(WGS84)
result["name"] = poi['name'] # POI名称
result["poi_type"] = poi['type'] # 兴趣点类型,顺序为大类、中类、小类
result["poi_type_code"] = poi['typecode'] # 兴趣点类型编码
result["cityname"] = poi['cityname'] # 城市名称
result["adname"] = poi['adname'] # 区域名称(区县级别的返回)
result["address"] = poi['address'] # 地址
yield result
except Exception as e:
self.log_message(f"请求异常: {str(e)}")
time.sleep(3) # 控制请求频率,避免API限流
-
主程序流程
在主程序的参数设置部分根据需要设置以下几个必须参数:
-
bbox_str:查询区域边界框(格式为”左上角坐标|右下角坐标”,经度和纬度用”,”分割,经度在前,纬度在后,坐标对用”|”分割。另外,经纬度小数点后不得超过6位)。具体坐标可以通过高德坐标拾取器得到坐标; -
key_list:API密钥列表,需要替换为实际可用的密钥; -
type_code:指定POI类型编码,尽量使用POI类型代码中的小类以避免单次请求的数量限制; -
grid_size:网格大小(单位为度),需合理设置网格大小,太小会导致API调用次数剧增;
-
filename:结果文件名,也可不设置,程序会自动以POIs_<POI类型编码>的名称保存。
if __name__ == "__main__":
"""
参数设置(根据需要进行替换)
"""
# 边界框坐标字符串,经度和纬度用","分割,经度在前,纬度在后,坐标对用"|"分割。经纬度小数点后不得超过6位。
bbox_str = '121.457268,31.394788|122.012615,30.777097'# 根据需要进行替换
# API密钥列表(使用时请替换为实际的备用key)
key_list = [
'key1',
'key2',
'key3',
]
# POI类型代码(具体可参考官方文档提供的高德POI分类与编码文件)
type_code = '050301'
# 保存结果的文件名(POI类型代码作为后缀名)
filename = f'POIs_{type_code}.csv'
# 网格大小(度),值越小划分越细,请求次数越多
grid_size = 0.1
"""
数据爬取
"""
# 创建区域分割器并生成网格
loc = RegionGridDivider(bbox_str, grid_size)
locs_to_use = loc.generate_grid_coordinates()
# 检查文件是否存在,用于控制CSV文件是否写入表头
file_exists = os.path.isfile(filename)
# 统计信息
poi_num = 0
num = len(locs_to_use)
# 遍历所有网格,获取POI数据
for loc in locs_to_use:
print(f"还剩{num}个网格")
num -= 1
par = GaodePoi(type_code=type_code, polygon=loc, key_list=key_list, filename=filename)
# 获取POI数量
count, _, all_failed = par.get_count() # 获取返回的第三个参数
if all_failed:
print("所有key都已尝试失败,程序停止")
break# 如果所有key都失败,则停止程序
poi_num += count
print(f"本次共获取{count}个poi数据")
print(f"总共获取{poi_num}个poi数据")
# 获取POI详细数据
dt = par.getPOIs()
df = pd.DataFrame(dt)
if len(df) != 0:
# 如果文件不存在,则在第一次写入时保存列名行
df.to_csv(filename, header=not file_exists, index=False, encoding='utf_8_sig', mode='a+')
file_exists = True # 确保后续写入不会重复保存列名行
time.sleep(1) # 控制写入频率
else:
pass # 跳过空数据
注意事项:使用API时请遵守高德地图的使用条款,以免被封禁IP。


本篇文章来源于微信公众号: 码农设计师
