首页城市定量分析【程序库】基于矩形区域的高...

【程序库】基于矩形区域的高德地图POI数据获取

计算机领域的发展趋势是开源共享,领域内的众多研究者、志愿者会将自己的代码、模型、数据放在诸如GitHub之类的互联网网站上,用户可以免费获得,并且可以基于一定规则自由地修改和分发。
本人在之前毕业论文写作过程中也是开源思想的受益者,使用到了很多开源的模型和数据集。
因此开启本系列内容,希望逐步打造成一个城市定量研究领域的开源代码仓库,让大家避免重复“造轮子”可以很方便地迁移到各自的研究或工作中。
因个人能力和时间均有限,本系列更新会较慢,因此希望有志同道合的朋友加入,共享自己的代码程序,加速代码仓库建设。
前往GitHub获取完整代码文件
https://github.com/returu/CityQuantHub
限于个人水平,不足之处在所难免,如若发现错误或有建议,欢迎留言交流。





01

项目概述



在地理信息分析等场景中,我们经常需要获取特定区域内的POI数据,如餐饮、酒店、商场等。高德地图提供了多种的POI数据接口(矩形、行政区搜索等),具体内容可以查看官方文档:
https://lbs.amap.com/api/webservice/guide/api-advanced/search
本次将通过高德地图的多边形搜索API,批量获取指定矩形区域内特定类型的POI(Point of Interest,兴趣点)数据。
需要注意的是,高德对API服务调用量有一些限制,直接获取大范围区域的POI数据可能会遇到请求限制:
  • key个月只有5000次的免费配额;
  • 一次请求只能获取200POI数据
因此,我们需要一些技巧来避免上述问题:
  • 将大范围地理区域划分为多个小网格,然后对每小网格分别请POI数据,避免单次请求数据量过大;
  • 设置多key,通过自动切换 API Key 以突破单个key的额度限制;
  • 每次尽量使用POI类型代码中的小类,例如餐饮服务(大类)→快餐厅(中类)→肯德基(小类),减少单次请求的数据量。
API Key可前往高德开放平台申请获取(需实名认证注册)
https://lbs.amap.com/
矩形框的左上、右下坐标值可通过高德坐标拾取器获取
https://developer.amap.com/tools/picker
POI类型代码可参考高德官方文档(代码文件中“POI分类编码和城市编码表”文件夹中已提供
https://lbs.amap.com/api/webservice/download
需要注意到是,该程序返回的POI数据的经纬度坐标为GWS84坐标系
02

代码解析



程序主要由三个核心类组成:RegionGridDividerCoordinateConverterGaodePoi。如需使用的话,在主程序的参数设置部分根据需要设置几个必须参数即可。

程序所需模块如下(仅需安装pandas这一第三方库,其余均为Python内置标准库):

import math
import json
import csv
import time
import os
import webbrowser
from urllib import request
import pandas as pd


  • 1、RegionGridDivider类:区域网格化处理
该类的主要作用是将一个较大的地理区域划分为多个小网格。由于高德地图 API 对单次请求的POI数量有一定限制,因此我们需要将大范围区域分割成多个小网格,然后依次对各个小网格分别进行请求。
该类包含以下三个方法:
  • divide_latitude方法:将纬度范围按照指定的网格大小进行划分,从北向南(纬度递减)划分网格直到覆盖整个区域;
  • divide_longitude方法:将经度范围按照指定的网格大小进行划分,从西向东(经度递增)划分网格直到覆盖整个区域
  • generate_grid_coordinates方法:组合经纬度划分结果,生成所有小网格的坐标对,格式为”左上经度,左上纬度|右下经度,右下纬度”(高德API参数格式要求)。
# 大网格划分成小网格部分
class RegionGridDivider(object):
    def __init__(self, bbox_str, grid_size):  
        """
        初始化区域分割器
        
        参数:
            bbox_str: 表示查询范围的坐标字符串,格式为"
左上角坐标|右下角坐标"
                      例如: "
121.457268,31.394788|122.012615,30.777097"
            grid_size: 分割网格的大小,单位为度
        "
""
        self.bbox_str = bbox_str
        self.grid_size = grid_size

    def divide_latitude(self):
        """
        将查询区域的纬度范围按照网格大小进行划分
        
        返回:
            list: 从北到南的纬度坐标列表(降序排列)
        "
""
        lat_max = float(self.bbox_str.split('|')[0].split(',')[1])  # 最大纬度
        lat_min = float(self.bbox_str.split('|')[1].split(',')[1])  # 最小纬度
        lat_list = [str(lat_max)]
        # 从最大纬度逐步减去网格大小,直到小于最小纬度
        while lat_max - lat_min > 0:
            m = lat_max - self.grid_size
            lat_max = lat_max - self.grid_size
            lat_list.append("{:.2f}".format(m)) # 保留两位小数
        return lat_list

    def divide_longitude(self):
        """
        将查询区域的经度范围按照网格大小进行划分
        
        返回:
            list: 从西到东的经度坐标列表(升序排列)
        "
""
        lng_max = float(self.bbox_str.split('|')[1].split(',')[0])  # 最大经度
        lng_min = float(self.bbox_str.split('|')[0].split(',')[0])  # 最小经度
        lng_list = [str(lng_min)]
        # 从最小经度逐步加上网格大小,直到大于最大经度
        while lng_max - lng_min > 0:
            m = lng_min + self.grid_size
            lng_min = lng_min + self.grid_size
            lng_list.append("{:.2f}".format(m)) # 保留两位小数
        return sorted(lng_list)  # 确保经度列表按升序排列

    # 构建每个小网格的左上角(西北)和右下角(东南)的坐标对
    def generate_grid_coordinates(self):
        """
        生成所有小网格的坐标对
        
        返回:
            list: 每个元素为"
左上角坐标|右下角坐标"格式的字符串
        "
""
        lat = self.divide_latitude()
        lng = self.divide_longitude()
        ls = []
        # 双重循环遍历所有经度和纬度的组合
        for i in range(len(lng)-1):
            for j in range(len(lat)-1):
                # 左上角(西北)坐标
                northwest = f"{lng[i]},{lat[j]}"
                # 右下角(东南)坐标
                southeast = f"{lng[i+1]},{lat[j+1]}"
                coor = northwest + '|' + southeast # 组合成API所需的坐标格式
                ls.append(coor)
        return ls


  • 2、CoordinateConverter类:坐标转换

该类用于将高德地图 API 返回的GCJ02(火星坐标系)转换为WGS84坐标系
# 构建坐标转换部分 - GCJ02(火星坐标系)与WGS84坐标系互转
class CoordinateConverter(object):
    def __init__(self):
        self.x_pi = 3.14159265358979324 * 3000.0 / 180.0
        self.pi = 3.1415926535897932384626  # π
        self.a = 6378245.0  # 长半轴
        self.ee = 0.00669342162296594323  # 偏心率平方

    def gcj02_to_wgs84(self, lng, lat):
        """
        GCJ02(火星坐标系)转GPS84(世界标准坐标系)
        
        参数:
            lng: 经度(GCJ02)
            lat: 纬度(GCJ02)
            
        返回:
            list: [经度(WGS84), 纬度(WGS84)]
        "
""
        # 计算偏移量
        dlat = self._transformlat(lng - 105.0, lat - 35.0)
        dlng = self._transformlng(lng - 105.0, lat - 35.0)
        # 坐标转换的数学计算
        radlat = lat / 180.0 * self.pi
        magic = math.sin(radlat)
        magic = 1 - self.ee * magic * magic
        sqrtmagic = math.sqrt(magic)
        # 计算偏移修正量
        dlat = (dlat * 180.0) / ((self.a * (1 - self.ee)) / (magic * sqrtmagic) * self.pi)
        dlng = (dlng * 180.0) / (self.a / sqrtmagic * math.cos(radlat) * self.pi)
        mglat = lat + dlat
        mglng = lng + dlng
        # 利用反向计算获取WGS-84坐标(通过对称原理)
        return [lng * 2 - mglng, lat * 2 - mglat]

    def _transformlat(self, lng, lat):
        """
        纬度转换辅助函数 - 计算纬度偏移量
        使用多项式和三角函数组合的经验公式
        "
""
        ret = -100.0 + 2.0 * lng + 3.0 * lat + 0.2 * lat * lat +
              0.1 * lng * lat + 0.2 * math.sqrt(math.fabs(lng))
        ret += (20.0 * math.sin(6.0 * lng * self.pi) + 20.0 *
                math.sin(2.0 * lng * self.pi)) * 2.0 / 3.0
        ret += (20.0 * math.sin(lat * self.pi) + 40.0 *
                math.sin(lat / 3.0 * self.pi)) * 2.0 / 3.0
        ret += (160.0 * math.sin(lat / 12.0 * self.pi) + 320 *
                math.sin(lat * self.pi / 30.0)) * 2.0 / 3.0
        return ret

    def _transformlng(self, lng, lat):
        """
        经度转换辅助函数 - 计算经度偏移量
        使用多项式和三角函数组合的经验公式
        "
""
        ret = 300.0 + lng + 2.0 * lat + 0.1 * lng * lng +
              0.1 * lng * lat + 0.1 * math.sqrt(math.fabs(lng))
        ret += (20.0 * math.sin(6.0 * lng * self.pi) + 20.0 *
                math.sin(2.0 * lng * self.pi)) * 2.0 / 3.0
        ret += (20.0 * math.sin(lng * self.pi) + 40.0 *
                math.sin(lng / 3.0 * self.pi)) * 2.0 / 3.0
        ret += (150.0 * math.sin(lng / 12.0 * self.pi) + 300.0 *
                math.sin(lng / 30.0 * self.pi)) * 2.0 / 3.0
        return ret


  • 3、GaodePoi类:API 请求与数据处理
该类是程序核心,负责与高德地图 API 进行交互并处理返回的数据。包含以下四个方法:
  • switch_key方法:当当前 Key 失效或达到请求限制时,自动切换到下一个备用Key
  • log_message方法:记录日志信息,如果提供了log_callback函数,则调用该函数,否则直接打印到控制台
  • get_count方法:获取指定区域内某种类型 POI 的总数用于后续分页查询,并验证 Key 的有效性;
  • getPOIs方法:获取当前小网格内的所有POI数据,并将它们存储在一个生成器中。每页最多返回20个POI,通过循环请求所有页面来获取所有数据。对于每个POI,提取其ID、经纬度、名称、类型等信息,并以字典形式返回。
# 构建URL访问API部分
class GaodePoi(object):
    def __init__(self, type_code, polygon, key_list, filename, log_callback=None):
        """
        初始化高德地图POI数据获取器
        
        参数:
            type_code: POI类型代码,例如"
050301"表示餐饮服务中快餐店中的肯德基
            polygon: 查询区域的多边形坐标
            key_list: API密钥列表,用于轮流使用以避免单个密钥请求超限
            filename: 保存数据的CSV文件名
            log_callback: 日志回调函数,用于输出运行信息
        "
""
        self.type_code = type_code
        self.polygon = polygon
        self.key_list = key_list
        self.current_key_index = 0
        self.filename = filename
        self.current_key = key_list[0] if key_list else None
        self.log_callback = log_callback if log_callback elseprint
        self.converter = CoordinateConverter()  # 初始化坐标转换器

    def switch_key(self):
        """切换到下一个API密钥"""
        if not self.key_list:
            raise ValueError("没有可用的key")
            
        self.current_key_index = (self.current_key_index + 1) % len(self.key_list)
        self.current_key = self.key_list[self.current_key_index]
        self.log_message(f"已切换到备用key: {self.current_key[:5]}...")
        time.sleep(2)  # 切换key后等待2秒,避免立即请求失败

    def log_message(self, message):
        """记录日志信息"""
        if self.log_callback:
            self.log_callback(message)
        else:
            print(message)

    def get_count(self):
        """
        获取指定区域内POI的数量并返回有效key
        
        返回:
            tuple: (POI数量, 当前有效key, 是否所有key都已尝试失败)
        "
""
        max_attempts = len(self.key_list)
        # 尝试使用所有可用key
        for attempt in range(max_attempts):
            try:
                # 构建API请求URL,请求第一页数据以获取总数
                url = f'https://restapi.amap.com/v3/place/polygon?key={self.current_key}&types={self.type_code}&polygon={self.polygon}&offset=20&page=1&extensions=all'
                self.log_message(f"尝试使用key: {self.current_key[:5]}... 请求API...")
                response = request.urlopen(url)
                poi_json = json.load(response)

                # 检查API返回状态
                if poi_json['status'] == '0':
                    # 处理密钥失效或超限的情况
                    if poi_json['info'in ('INVALID_USER_KEY''DAILY_QUERY_OVER_LIMIT'):
                        self.log_message(f"Key失效: {self.current_key[:5]}..., 错误信息: {poi_json['info']}")
                        if attempt == max_attempts - 1:
                            self.log_message("所有key都已尝试,仍然失败")
                            return 0, None, True  # 第三个参数表示是否所有key都已尝试失败
                        self.switch_key()
                        continue
                    else:
                        self.log_message(f"API请求错误: {poi_json['info']}")
                        return 0, None, False

                count = int(poi_json['count'])
                self.log_message(f"当前使用key: {self.current_key[:5]}..., 状态: {poi_json['status']}, 找到 {count} 个POI")
                time.sleep(1)
                return count, self.current_key, False  # 返回有效count和key

            except Exception as e:
                self.log_message(f"请求异常: {str(e)}")
                if attempt < max_attempts - 1:
                    self.switch_key()
                else:
                    self.log_message("所有key都已尝试,仍然失败")
                    return 0, None, True

        return 0, None, True

    # 根据输入的POI类型编码和城市编码,获取相应的POI数据并存储在文件中
    def getPOIs(self):
        """
        获取指定区域内的所有POI数据,返回一个生成器
        
        返回:
            generator: 每次产生一个POI的字典数据
        "
""
        count, valid_key, all_failed = self.get_count()
        
        if count == 0 or not valid_key or all_failed:
            self.log_message("无数据或无有效key......")
            return
            
        # 计算总页数(每页20条数据)
        # 官方文档强烈建议不超过25,若超过25可能造成访问报错
        pages = count // 20 + 1
        for page in range(1, pages+1):
            try:
                self.log_message(f'使用有效key: {valid_key[:5]}..., 正在获取第 {page}/{pages} 页数据')
                url = f'https://restapi.amap.com/v3/place/polygon?key={valid_key}&types={self.type_code}&polygon={self.polygon}&offset=20&page={page}&extensions=all'
                response = request.urlopen(url)
                poi_json = json.load(response)
                
                # 检查API返回状态
                if poi_json['status'] == '0':
                    # 理论上这里不会出现key失效的情况,因为使用的是已经验证过的有效key
                    self.log_message(f"API请求错误: {poi_json['info']}")
                    break
                
                pois = poi_json['pois']
                for poi in pois:
                    # 提取需要的POI信息
                    result = {}
                    result["poi_id"] = poi['id'# 唯一ID
                    
                    # 获取原始坐标并转换为WGS84
                    lon_gcj02 = float(poi['location'].split(',')[0])  # 经度(GCJ02)
                    lat_gcj02 = float(poi['location'].split(',')[1])  # 纬度(GCJ02)
                    lon_wgs84, lat_wgs84 = self.converter.gcj02_to_wgs84(lon_gcj02, lat_gcj02)
                    
                    result["lon"] = lon_wgs84  # 经度(WGS84)
                    result["lat"] = lat_wgs84  # 纬度(WGS84)
                    result["name"] = poi['name'# POI名称
                    result["poi_type"] = poi['type'# 兴趣点类型,顺序为大类、中类、小类
                    result["poi_type_code"] = poi['typecode'# 兴趣点类型编码
                    result["cityname"] = poi['cityname'# 城市名称
                    result["adname"] = poi['adname'# 区域名称(区县级别的返回)
                    result["address"] = poi['address'# 地址

                    yield result
                
            except Exception as e:
                self.log_message(f"请求异常: {str(e)}")
            
            time.sleep(3)  # 控制请求频率,避免API限流

  • 主程序流程

在主程序的参数设置部分根据需要设置以下几个必须参数:

  • bbox_str:查询区域边界框(格式为”左上角坐标|右下角坐标”,经度和纬度用”,”分割,经度在前,纬度在后,坐标对用”|”分割。另外,经纬度小数点后不得超过6位)。具体坐标可以通过高德坐标拾取器得到坐标
  • key_listAPI钥列表,需要替换为实际可用的密钥;
  • type_code指定POI类型编码,尽量使用POI类型代码中的小类以避免单次请求的数量限制;
  • grid_size:网格大小(单位为度),需合理设置网格大小,太小会导致API用次数剧增
  • filename:结果文件名,也可不设置,程序会自POIs_<POI类型编码>名称保存
if __name__ == "__main__":
    """
    参数设置(根据需要进行替换)
    "
""
    # 边界框坐标字符串,经度和纬度用","分割,经度在前,纬度在后,坐标对用"|"分割。经纬度小数点后不得超过6位。
    bbox_str = '121.457268,31.394788|122.012615,30.777097'# 根据需要进行替换
    # API密钥列表(使用时请替换为实际的备用key)
    key_list = [
        'key1',  
        'key2',
        'key3',
    ]
    # POI类型代码(具体可参考官方文档提供的高德POI分类与编码文件)
    type_code = '050301'
    # 保存结果的文件名(POI类型代码作为后缀名)
    filename = f'POIs_{type_code}.csv'
    # 网格大小(度),值越小划分越细,请求次数越多
    grid_size = 0.1  

    """
    数据爬取
    "
""
    # 创建区域分割器并生成网格
    loc = RegionGridDivider(bbox_str, grid_size)
    locs_to_use = loc.generate_grid_coordinates()
    # 检查文件是否存在,用于控制CSV文件是否写入表头
    file_exists = os.path.isfile(filename)
    # 统计信息
    poi_num = 0
    num = len(locs_to_use)
    # 遍历所有网格,获取POI数据
    for loc in locs_to_use:
        print(f"还剩{num}个网格")
        num -= 1
        par = GaodePoi(type_code=type_code, polygon=loc, key_list=key_list, filename=filename)
        # 获取POI数量
        count, _, all_failed = par.get_count()  # 获取返回的第三个参数
        if all_failed:
            print("所有key都已尝试失败,程序停止")
            break# 如果所有key都失败,则停止程序
        poi_num += count
        print(f"本次共获取{count}个poi数据")
        print(f"总共获取{poi_num}个poi数据")
        # 获取POI详细数据
        dt = par.getPOIs()
        df = pd.DataFrame(dt)
        if len(df) != 0:
            # 如果文件不存在,则在第一次写入时保存列名行
            df.to_csv(filename, header=not file_exists, index=False, encoding='utf_8_sig', mode='a+')
            file_exists = True  # 确保后续写入不会重复保存列名行
            time.sleep(1) # 控制写入频率
        else:
            pass # 跳过空数据

注意事项:使用API时请遵守高德地图的使用条款,以免被封禁IP。

本篇文章来源于微信公众号: 码农设计师

RELATED ARTICLES

欢迎留下您的宝贵建议

Please enter your comment!
Please enter your name here

- Advertisment -

Most Popular

Recent Comments