Source code for urbanpy.routing.routing

import time
import subprocess
import sys
import pathlib
import requests
import googlemaps
import pandas as pd
import numpy as np
import networkx as nx
import osmnx as ox
import geopandas as gpd
from shapely.geometry import Point
from typing import Union, Tuple
from rich.progress import Progress

__all__ = [
    "start_osrm_server",
    "stop_osrm_server",
    "osrm_route",
    "google_maps_dist_matrix",
    "ors_api",
    "compute_osrm_dist_matrix",
    "google_maps_dir_matrix",
    "nx_route",
    "isochrone_from_api",
    "isochrone_from_graph",
]

ROUTING_MODUEL_DIR = pathlib.Path(__file__).parent.resolve()
CONTAINER_NAME = "osrm_routing_server"


def check_container_is_running(container_name: str) -> bool:
    """
    Checks if a container is running

    Parameters
    ----------

    container_name: str
        Name of container to check

    Returns
    -------

    container_running: bool
        True if container is running, False otherwise.

    """
    completed_process = subprocess.run(
        ["docker", "ps"], capture_output=True, check=True
    )
    stdout_str = completed_process.stdout.decode("utf-8")
    container_running = container_name in stdout_str

    return container_running


[docs] def start_osrm_server(country: str, continent: str, profile: str) -> None: """ Download data for OSRM, process it and start a local osrm server Parameters ---------- country: str Which country to download data from. Expected in lower case & dashes replace spaces. continent: str Which continent of the given country. Expected in lower case & dashes replace spaces. profile: str. One of {'foot', 'car', 'bicycle'} Travel mode to use when routing and estimating travel time. Examples -------- >>> urbanpy.routing.start_osrm_server('peru', 'south-america', 'foot') Starting server ... Server was started succesfully. """ container_name = f"{CONTAINER_NAME}_{continent}_{country}_{profile}" container_running = check_container_is_running(container_name) # Check platform if sys.platform in ["darwin", "linux"]: container_check = ["docker", "inspect", container_name] container_start = ["docker", "start", container_name] download_command = [ "bash", str(pathlib.PosixPath(ROUTING_MODUEL_DIR, "unix_download.sh")), CONTAINER_NAME, country, continent, profile, ] else: container_check = ["powershell.exe", "docker", "inspect", container_name] container_start = ["powershell.exe", "docker", "start", container_name] download_command = [ "powershell.exe", str(pathlib.WindowsPath(ROUTING_MODUEL_DIR, "windows_download.ps1")), CONTAINER_NAME, country, continent, profile, ] # Check if container exists: if subprocess.run(container_check, capture_output=True).returncode == 0: if container_running: print("Server is already running.") else: try: print("Starting server ...") subprocess.run(container_start, check=True) time.sleep(5) # Wait server to be prepared to receive requests print("Server was started succesfully") except subprocess.CalledProcessError as error: print( "Something went wrong. Please check if port 5000 is being used or check your docker installation." ) print(f"Error: {error}") else: try: print( f"This is the first time you initialized a server for {country} on {profile}." ) print("Initializing server setup. This may take several minutes ...") print("To view the detailed logs run the following command from terminal:") print( f"$ watch -n 5 tail -20 ~/data/osrm/{continent}/{country}/logs/{profile}.txt" ) subprocess.run(download_command, check=True) time.sleep(5) # Wait server to be prepared to receive requests print("Server was started succesfully") except subprocess.CalledProcessError as error: print( "Something went wrong. Please check if port 5000 is being used or your docker installation." ) print(f"Error: {error}")
[docs] def stop_osrm_server(country: str, continent: str, profile: str) -> None: """ Run docker stop on the server's container. Parameters ---------- country: str Which country the osrm to stop is routing. Expected in lower case & dashes replace spaces. continent: str Continent of the given country. Expected in lower case & dashes replace spaces. profile: str. One of {'foot', 'car', 'bicycle'} Travel mode to use when routing and estimating travel time. Examples -------- >>> urbanpy.routing.stop_osrm_server('peru', 'south-america', 'foot') Server stopped succesfully """ container_name = f"{CONTAINER_NAME}_{continent}_{country}_{profile}" # Check platform if sys.platform in ["darwin", "linux"]: docker_top = ["docker", "top", container_name] docker_stop = ["docker", "stop", container_name] else: docker_top = ["powershell.exe", "docker", "top", container_name] docker_stop = ["powershell.exe", "docker", "stop", container_name] # Check if container exists: if subprocess.run(docker_top, capture_output=True).returncode == 0: if check_container_is_running(container_name) == True: try: subprocess.run(docker_stop, capture_output=True, check=True) # subprocess.run(['docker', 'container', 'rm', 'osrm_routing_server']) print("Server was stoped succesfully") except subprocess.CalledProcessError as error: print( f"Something went wrong. Please check your docker installation.\nError: {error}" ) else: print("Server is not running.") else: print("Server does not exist.")
[docs] def osrm_route( origin: Union[pd.DataFrame, gpd.GeoDataFrame], destination: Union[pd.DataFrame, gpd.GeoDataFrame], ) -> Tuple[float, float]: """ Query an OSRM routing server for routes between an origin and a destination using a specified profile. Travel mode ("foot", "bicycle", or "car") is determined by the profile selected when starting the OSRM server Parameters ---------- origin: DataFrame with columns x and y or Point geometry Input origin in lat lon pairs (y, x) to pass into the routing engine destination: DataFrame with columns x and y or Point geometry Input destination in lat lon pairs (y,x) to pass to the routing engine Returns ------- distance: float Total travel distance from origin to destination in meters duration: float Total travel time in minutes """ orig = f"{origin.x},{origin.y}" dest = f"{destination.x},{destination.y}" # If "profile" is passed in the url the default profile is used by the local osrm server url = f"http://localhost:5000/route/v1/profile/{orig};{dest}" try: response = requests.get(url, params={"overview": "false"}) except requests.exceptions.ConnectionError as e: print("Waiting for server to be ready ...") time.sleep(5) response = requests.get(url, params={"overview": "false"}) try: data = response.json()["routes"][0] distance, duration = data["distance"], data["duration"] return distance, duration except Exception as err: # print(err) return np.nan, np.nan
[docs] def google_maps_dist_matrix( origin, destination, mode: str, api_key: str, **kwargs ) -> Tuple[int, int]: """ Google Maps distance matrix support. Parameters ---------- origin: tuple, list or str Origin for distance calculation. If tuple or list, a matrix for all lat-lon pairs will be computed (origin as rows). If str, google_maps will georeference and then compute. destination: tuple, list or str Origin for distance calculation. If tuple or list, a matrix for all lat-lon pairs will be computed (origin as rows). If str, google_maps will georeference and then compute. mode: str. One of {"driving", "walking", "transit", "bicycling"} Mode for travel time calculation api_key: str Google Maps API key **kwargs Additional keyword arguments for the distance matrix API. See https://github.com/googlemaps/google-maps-services-python/blob/master/googlemaps/distance_matrix.py Returns ------- dist: int Distance for the o/d pair. Depends on metric parameter time: int Travel time duration Examples -------- >>> API_KEY = 'example-key' >>> urbanpy.routing.google_maps_dist_matrix('San Juan de Lurigancho', 'Miraflores', 'walking', API_KEY) (18477, 13494) >>> urbanpy.routing.google_maps_dist_matrix((-12,-77), (-12.01,-77.01), 'walking', API_KEY) (2428, 1838) >>> urbanpy.routing.google_maps_dist_matrix([(-12,-77),(-12.11,-77.01)], [(-12.11,-77.01),(-12,-77)], 'walking', API_KEY) ([[13743, 0], [0, 13720]], [[10232, 0], [0, 10674]]) """ client = googlemaps.Client(key=api_key) try: r = client.distance_matrix(origin, destination, mode=mode, **kwargs) rows = r["rows"] dist, dur = [], [] if len(rows) > 1: for row in rows: dist.append( [element["distance"]["value"] for element in row["elements"]] ) dur.append( [element["duration"]["value"] for element in row["elements"]] ) else: dist = r["rows"][0]["elements"][0]["distance"]["value"] dur = r["rows"][0]["elements"][0]["duration"]["value"] except Exception as err: print(err) dist = None dur = None return dist, dur
[docs] def ors_api(locations, origin, destination, profile, metrics, api_key): """ Interface with OpenRoute Service API for distance matrix computation. Parameters ---------- origin: list Input origin(s) indices in the location array to compute travel times and distances destination: list Input destination(s) indices in the location array to compute travel times and distances profile: str. One of {'driving-car', 'foot-walking', 'cycling-regular'} metrics: list Combination of metrics to compute (distance and travel time, or one of them) api_key: str Authorization API key for OpenRouteService (see API limits) Returns ------- dist: list Distance matrix dur: list Travel time matrix Examples -------- >>> locations = [[9.70093,48.477473],[9.207916,49.153868],[37.573242,55.801281],[115.663757,38.106467]] >>> metrics = ["distance","duration"] >>> sources = [0] >>> destinations = [1,2,3] >>> api_key = ... >>> urbanpy.routing.ors_api(locations, sources, destinations, metrics, 'driving-car', api_key) [[5753.86,88998.08,399003.44]] [[140861.31,2434228.75,1.0262603E7]] """ body = { "locations": locations, "sources": origin, "destinations": destination, "metrics": metrics, } headers = { "Accept": "application/json, application/geo+json, application/gpx+xml, img/png; charset=utf-8", "Authorization": api_key, "Content-Type": "application/json; charset=utf-8", } r = requests.post( f"https://api.openrouteservice.org/v2/matrix/{profile}", json=body, headers=headers, ) if r.status_code == 200: result = r.json() return result["distances"], result["durations"] else: return -1, -1
[docs] def compute_osrm_dist_matrix(origins, destinations): """ Compute distance and travel time origin-destination matrices kahsgfhjasdgfkasjdhjhsdg Parameters ---------- origins: GeoDataFrame or GeoSeries Input Point geometries corresponding to the starting points of a route. destinations: GeoDataFrame or GeoSeries Point geometries corresponding to the end points of a route. Returns ------- dist_matrix: array_like Array with origins as rows and destinations as columns. Distance in meters. dur_matrix: array_like Array with origins as rows and destinations as columns. Duration in minutes. Examples -------- >>> hex = urbanpy.geom.gen_hexagons(8, lima) >>> fs = urbanpy.download.overpass_pois(hex.total_bounds, 'food_supply') >>> urbanpy.routing.start_osrm_server('peru', 'south-america') >>> dist, dur = urbanpy.routing.compute_osrm_dist_matrix(points.head(), fs.head(), 'walking') array([[ 82012.4, 93899.9, 88964.6, 111839.8, 87844. ], [ 26346.3, 23725.6, 19618.9, 38607.9, 22725.6], [ 26504.4, 23883.7, 19777. , 38766. , 22883.7], [ 33314.3, 30693.6, 26586.9, 45575.9, 29693.5], [ 26659.3, 24038.7, 19931.9, 38920.9, 23038.6]]) array([[59093.7, 67666.8, 64123.1, 80592.8, 63331.5], [19027.5, 17094.6, 14152.3, 27797.6, 16402.8], [19141.5, 17208.6, 14266.3, 27911.6, 16516.8], [24044.9, 22112. , 19169.7, 32815. , 21420.2], [19253.1, 17320.2, 14377.9, 28023.2, 16628.4]]) """ dist_matrix = np.zeros(shape=(origins.shape[0], destinations.shape[0])) dur_matrix = np.zeros(shape=(origins.shape[0], destinations.shape[0])) if type(origins) == gpd.GeoSeries: origins = origins.to_frame() if type(destinations) == gpd.GeoSeries: destinations = destinations.to_frame() with Progress() as progress: pb_orig = progress.add_task( total=origins.shape[0], description="[red]Origins processed" ) pb_dest = progress.add_task( total=destinations.shape[0], description="[blue]Destinations" ) for i, orig in origins.iterrows(): for j, dest in destinations.iterrows(): dist, dur = osrm_route(orig.geometry, dest.geometry) dist_matrix[i, j] = dist dur_matrix[i, j] = dur progress.update(pb_dest, advance=1) progress.update(pb_orig, advance=1) return dist_matrix, dur_matrix
[docs] def google_maps_dir_matrix(origin, destination, mode, api_key, **kwargs): """ Google Maps distance matrix support. Parameters ---------- origin: tuple, list or str Origin for distance calculation. If tuple or list, a matrix for all lat-lon pairs will be computed (origin as rows). If str, google_maps will georeference and then compute. destination: tuple, list or str Origin for distance calculation. If tuple or list, a matrix for all lat-lon pairs will be computed (origin as rows). If str, google_maps will georeference and then compute. mode: str. One of {"driving", "walking", "transit", "bicycling"} Mode for travel time calculation api_key: str Google Maps API key **kwargs Additional keyword arguments for the distance matrix API. See https://github.com/googlemaps/google-maps-services-python/blob/master/googlemaps/directions.py Returns ------- dist: int Distance for the o/d pair. Depends on metric parameter dur: int Travel time duration, depends on units kwargs Examples -------- >>> API_KEY = 'example-key' >>> urbanpy.routing.google_maps_dir_matrix('San Juan de Lurigancho', 'Miraflores', 'walking', API_KEY) (18477, 13494) """ client = googlemaps.Client(key=api_key) try: r = client.directions(origin, destination, mode=mode, **kwargs) legs = r[0]["legs"] dist, dur = 0, 0 if len(legs) > 1: for leg in legs: dist += leg["distance"]["value"] dur += leg["duration"]["value"] else: dist = legs[0]["distance"]["value"] dur = legs[0]["duration"]["value"] except Exception as err: print(err) dist = None dur = None return dist, dur
[docs] def nx_route(graph, source, target, weight, length=True): """ Compute shortest path from a source and target node. Parameters ---------- graph: NetworkX Graph Input graph from which to calculate paths source: str or int ID of the source node from which to calculate path. Depending on the graph, this may be a string or integer or a combination of both as tuples. target: str or int ID of the target node. Type corresponds to node id types in the input graph. weight: str Attribute to be used as weights in the path. If None returns the sequence of nodes or the number of nodes to travel as length. length: bool Flag for whether to calculate the path lenght or the sequence of nodes to follow. If weight is none and length is true, the number of nodes in the path will be returned. Returns ------- path: list Sequence of node ids in a list. path_length: float or int Length of the path according to the weight attribute. Examples -------- >>> import osmnx as ox >>> import urbanpy as up >>> from random import choices >>> G = ox.graph_from_place('Lima, Peru') >>> source, target = choices(list(G.nodes), k=2) >>> up.routing.nx_route(G, source, target, 'length') 8348.236999999996 """ if length: try: path_length = nx.shortest_path_length(graph, source, target, weight=weight) return path_length except: # If there is no path within the graph return -1 else: try: path = nx.shortest_path(graph, source, target, weight=weight) return path except: # If there is no path within the graph return -1
[docs] def isochrone_from_api(locations, time_range, profile, api, api_key): """ Get isochrones from either the OpenRouteService or MapBox API Parameters ---------- locations: array_like (float, float) Set of locations from which to calculate the isochrones in lon, lat pairs. Larger sets may exceed API limits. time_range: array_like (int) Time intervals to compute i.e. the travel time ranges for the isochrones. Depends on the API profile: str. Depends on the API. Mobility profile to compute the isochrones api: str. One of {"ors", "mapbox"} API to request the isochrones from. api_key: str API auth key Returns ------- isochrones: GeoDataFrame GeoPandas GeoDataFrame containing the isochrone polygons with columns: contour (time range), group index (isochrone-location matcher) and geometry Examples -------- >>> import urbanpy as up >>> API_KEY = "some_key" >>> up.routing.isochrone_from_api([[8.681495,49.41461],[8.686507,49.41943]], [300,900], 'foot-walking', 'ors', API_KEY) contour | group_index | geometry 300.0 | 0 | POLYGON ((8.67640 49.41485, 8.67643 49.41461, ... 900.0 | 0 | POLYGON ((8.66750 49.41169, 8.66755 49.41164, ... 300.0 | 1 | POLYGON ((8.68103 49.41902, 8.68167 49.41815, ... 900.0 | 1 | POLYGON ((8.67108 49.41982, 8.67109 49.41946, ... >>> up.routing.isochrone_from_api([[8.681495,49.41461],[8.686507,49.41943]], [5,10], 'driving', 'mapbox', API_KEY) contour | group_index | geometry 10 | 0 | POLYGON ((8.68149 49.43661, 8.68100 49.43461, ... 5 | 0 | POLYGON ((8.67850 49.42361, 8.67621 49.42190, ... 10 | 1 | POLYGON ((8.67258 49.44751, 8.67138 49.44743, ... 5 | 1 | POLYGON ((8.68265 49.42957, 8.68151 49.42846, ... """ if api == "ors": body = {"locations": locations, "range": time_range} headers = { "Accept": "application/json, application/geo+json, application/gpx+xml, img/png; charset=utf-8", "Authorization": api_key, "Content-Type": "application/json; charset=utf-8", } call = requests.post( f"https://api.openrouteservice.org/v2/isochrones/{profile}", json=body, headers=headers, ) if call.status_code == 200: isochrones = gpd.GeoDataFrame.from_features(call.json()) isochrones.crs = "EPSG:4326" isochrones.rename(columns={"value": "contour"}, inplace=True) return isochrones[["contour", "group_index", "geometry"]] else: print(f"Request error with HTTP code {call.status_code}: {call.reason}") else: contour_min = ",".join([str(time) for time in time_range]) features = {} # There may be a better solution. Mapbox does not support multiple points in one go for ix, (lon, lat) in enumerate(locations): pair = ",".join([str(lon), str(lat)]) url = f"https://api.mapbox.com/isochrone/v1/mapbox/{profile}/{pair}?contours_minutes={contour_min}&polygons=true&access_token={api_key}" call = requests.get(url) if call.status_code == 200: if features == {}: features_ = call.json() # Assing a group index for the request number (id for the location-contour match) for feature in features_["features"]: feature["properties"]["group_index"] = ix features = features_ else: features_ = call.json()["features"] for feature in features_: feature["properties"]["group_index"] = ix features["features"].extend(features_) else: pass isochrones = gpd.GeoDataFrame.from_features(features) isochrones.crs = "EPSG:4326" return isochrones[["contour", "group_index", "geometry"]]
[docs] def isochrone_from_graph(graph, locations, time_range, profile): """ Create isochrones from a network graph. Parameters ---------- graph: NetworkX or OSMnx graph. Input graph to compute isochrones from. locations: array_like Locations (lon, lat) from which to trace the isochrones. time_range: int Travel time from which to construct the isochrones profile: str or int If str, a default avg. speed will be used to compute the travel time. If int, this value will be used as the avg. speed to compute the isochrones. Returns ------- isochrones: GeoDataFrame GeoDataFrame containing the isochrones for the set of locations and time_ranges. Examples -------- >>> import urbanpy as up >>> import osmnx as ox >>> G = ox.graph_from_place('Berkeley, CA, USA', network_type='walking') >>> up.routing.isochrone_from_graph(G, [[],[]], [5, 10], 'walking') """ profiles = { "driving": 20, "walking": 5, "cycling": 10, } if profile not in profiles and type(profile) == int: travel_speed = profile else: travel_speed = profiles[profile] center_nodes = [ox.nearest_nodes(graph, x, y) for x, y in locations] G = ox.project_graph(graph) meters_per_minute = travel_speed * 1000 / 60 # km per hour to m per minute for u, v, k, data in G.edges(data=True, keys=True): data["time"] = data["length"] / meters_per_minute data = [] for ix, center_node in enumerate(center_nodes): for trip_time in sorted(time_range, reverse=True): subgraph = nx.ego_graph(G, center_node, radius=trip_time, distance="time") node_points = [ Point((data["lon"], data["lat"])) for node, data in subgraph.nodes(data=True) ] bounding_poly = gpd.GeoSeries(node_points).unary_union.convex_hull data.append([ix, trip_time, bounding_poly]) isochrones = gpd.GeoDataFrame(data, columns=["group_index", "contour", "geometry"]) isochrones.crs = "EPSG:4326" return isochrones