Skip to content

Reference

Config

Source code in overture2hdx/app.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
class Config:
    def __init__(
        self,
        config_yaml: str,
        hdx_site: str = None,
        hdx_api_key: str = None,
        hdx_owner_org: str = None,
        hdx_maintainer: str = None,
        overture_version: str = None,
        log_level: str = None,
        log_format: str = None,
        parallel_processing: bool = None,
        max_threads: int = None,
        memory_limit_gb: int = None,
    ):
        """
        Initialize the configuration.

        Args:
            config_yaml (str): YAML configuration string.
            hdx_site (str, optional): HDX site. Defaults to None.
            hdx_api_key (str, optional): HDX API key. Defaults to None.
            hdx_owner_org (str, optional): HDX owner organization. Defaults to None.
            hdx_maintainer (str, optional): HDX maintainer. Defaults to None.
            overture_version (str, optional): Overture release version. Defaults to None.
            log_level (str, optional): Logging level. Defaults to None.
            log_format (str, optional): Logging format. Defaults to None.
            parallel_processing (bool, optional): Enable parallel processing. Defaults to auto-detect.
            max_threads (int, optional): Maximum number of threads. Defaults to auto-detect.
            memory_limit_gb (int, optional): Memory limit in GB. Defaults to auto-detect.
        """
        self.HDX_SITE = hdx_site or os.environ.get("HDX_SITE") or "demo"
        self.HDX_API_KEY = hdx_api_key or os.environ.get("HDX_API_KEY")
        self.HDX_OWNER_ORG = hdx_owner_org or os.environ.get("HDX_OWNER_ORG")
        self.HDX_MAINTAINER = hdx_maintainer or os.environ.get("HDX_MAINTAINER")
        self.OVERTURE_RELEASE_VERSION = overture_version or os.environ.get("OVERTURE_VERSION", "2025-03-19.0")

        # System resources configuration
        self.PARALLEL_PROCESSING = parallel_processing if parallel_processing is not None else True
        self.MAX_THREADS = max_threads or SystemResources.get_optimal_thread_count()
        self.MEMORY_LIMIT_GB = memory_limit_gb or SystemResources.get_optimal_memory_limit()

        self.config = yaml.safe_load(config_yaml)
        self._bbox_cache = None
        self._boundary_gdf_geojson_str_cache = None

        self.validate_config()
        self.setup_config()
        setup_logging(level=log_level, format=log_format)

        # Log system configuration
        logger.info(
            f"System configuration: CPUs={SystemResources.get_cpu_count()}, "
            f"Memory={SystemResources.get_memory_gb():.1f}GB, "
            f"Using {self.MAX_THREADS} threads and {self.MEMORY_LIMIT_GB}GB memory limit"
        )

    def setup_config(self):
        """
        Set up the HDX configuration.

        Raises:
            ValueError: If HDX credentials (API key, owner org, maintainer) are not provided.
        """
        if not (self.HDX_API_KEY and self.HDX_OWNER_ORG and self.HDX_MAINTAINER):
            raise ValueError("HDX credentials (API key, owner org, maintainer) are required")

        self.HDX_URL_PREFIX = Configuration.create(
            hdx_site=self.HDX_SITE,
            hdx_key=self.HDX_API_KEY,
            user_agent="HDXPythonLibrary/6.3.4",
        )
        logger.info(f"Using HDX site: {self.HDX_URL_PREFIX}")

    def validate_config(self):
        """
        Validate the configuration.

        Raises:
            ValueError: If HDX credentials environment variables are not set.
            ValueError: If ISO3 country code is not specified in YAML configuration.
        """
        if not (self.HDX_API_KEY and self.HDX_OWNER_ORG and self.HDX_MAINTAINER):
            raise ValueError("HDX credentials environment variables not set")

        if not self.config.get("iso3"):
            raise ValueError("ISO3 country code must be specified in YAML configuration")

    @property
    def country_code(self):
        return self.config.get("iso3").upper()

    @property
    def geom(self):
        return self.config.get("geom")

    @property
    def hdx_key(self):
        return self.config.get("key")

    @property
    def hdx_subnational(self):
        return self.config.get("subnational", "false")

    @property
    def frequency(self):
        return self.config.get("frequency", "yearly")

    @property
    def categories(self):
        return self.config.get("categories", [])

    @property
    def bbox(self):
        # Use cached value if available
        if self._bbox_cache is not None:
            return self._bbox_cache

        logger.info("Calculating bounding box...")
        if self.geom:
            geom = json.loads(json.dumps(self.geom))
            boundary_gdf = gpd.GeoDataFrame.from_features(geom["features"])
            result = boundary_gdf.total_bounds.tolist()
        else:
            try:
                logger.info("Fetching bounding box from remote source...")
                bbox_response = requests.get(
                    "https://raw.githubusercontent.com/kshitijrajsharma/global-boundaries-bbox/refs/heads/main/bbox.json"
                )
                bbox_response.raise_for_status()
                bbox_data = bbox_response.json()
            except Exception as e:
                logger.error(f"Failed to fetch bbox data: {str(e)}")
                raise Exception(f"Failed to fetch bbox data: {str(e)}")

            if self.country_code not in bbox_data:
                logger.error(f"Invalid country code: {self.country_code}")
                raise ValueError(f"Invalid country code: {self.country_code}")

            result = bbox_data[self.country_code]

        # Cache the result
        self._bbox_cache = result
        logger.info(f"Bounding box: {result}")
        return result

    @property
    def boundary_gdf_geojson_str(self):
        # Use cached value if available
        if self._boundary_gdf_geojson_str_cache is not None:
            return self._boundary_gdf_geojson_str_cache

        if self.geom:
            logger.info("Generating boundary GeoJSON...")
            geom = json.loads(json.dumps(self.geom))
            boundary_gdf = gpd.GeoDataFrame.from_features(geom["features"])
            result = json.dumps(boundary_gdf.geometry.union_all().__geo_interface__)
            # Cache the result
            self._boundary_gdf_geojson_str_cache = result
            return result
        return None

__init__(config_yaml, hdx_site=None, hdx_api_key=None, hdx_owner_org=None, hdx_maintainer=None, overture_version=None, log_level=None, log_format=None, parallel_processing=None, max_threads=None, memory_limit_gb=None)

Initialize the configuration.

Parameters:

Name Type Description Default
config_yaml str

YAML configuration string.

required
hdx_site str

HDX site. Defaults to None.

None
hdx_api_key str

HDX API key. Defaults to None.

None
hdx_owner_org str

HDX owner organization. Defaults to None.

None
hdx_maintainer str

HDX maintainer. Defaults to None.

None
overture_version str

Overture release version. Defaults to None.

None
log_level str

Logging level. Defaults to None.

None
log_format str

Logging format. Defaults to None.

None
parallel_processing bool

Enable parallel processing. Defaults to auto-detect.

None
max_threads int

Maximum number of threads. Defaults to auto-detect.

None
memory_limit_gb int

Memory limit in GB. Defaults to auto-detect.

None
Source code in overture2hdx/app.py
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
def __init__(
    self,
    config_yaml: str,
    hdx_site: str = None,
    hdx_api_key: str = None,
    hdx_owner_org: str = None,
    hdx_maintainer: str = None,
    overture_version: str = None,
    log_level: str = None,
    log_format: str = None,
    parallel_processing: bool = None,
    max_threads: int = None,
    memory_limit_gb: int = None,
):
    """
    Initialize the configuration.

    Args:
        config_yaml (str): YAML configuration string.
        hdx_site (str, optional): HDX site. Defaults to None.
        hdx_api_key (str, optional): HDX API key. Defaults to None.
        hdx_owner_org (str, optional): HDX owner organization. Defaults to None.
        hdx_maintainer (str, optional): HDX maintainer. Defaults to None.
        overture_version (str, optional): Overture release version. Defaults to None.
        log_level (str, optional): Logging level. Defaults to None.
        log_format (str, optional): Logging format. Defaults to None.
        parallel_processing (bool, optional): Enable parallel processing. Defaults to auto-detect.
        max_threads (int, optional): Maximum number of threads. Defaults to auto-detect.
        memory_limit_gb (int, optional): Memory limit in GB. Defaults to auto-detect.
    """
    self.HDX_SITE = hdx_site or os.environ.get("HDX_SITE") or "demo"
    self.HDX_API_KEY = hdx_api_key or os.environ.get("HDX_API_KEY")
    self.HDX_OWNER_ORG = hdx_owner_org or os.environ.get("HDX_OWNER_ORG")
    self.HDX_MAINTAINER = hdx_maintainer or os.environ.get("HDX_MAINTAINER")
    self.OVERTURE_RELEASE_VERSION = overture_version or os.environ.get("OVERTURE_VERSION", "2025-03-19.0")

    # System resources configuration
    self.PARALLEL_PROCESSING = parallel_processing if parallel_processing is not None else True
    self.MAX_THREADS = max_threads or SystemResources.get_optimal_thread_count()
    self.MEMORY_LIMIT_GB = memory_limit_gb or SystemResources.get_optimal_memory_limit()

    self.config = yaml.safe_load(config_yaml)
    self._bbox_cache = None
    self._boundary_gdf_geojson_str_cache = None

    self.validate_config()
    self.setup_config()
    setup_logging(level=log_level, format=log_format)

    # Log system configuration
    logger.info(
        f"System configuration: CPUs={SystemResources.get_cpu_count()}, "
        f"Memory={SystemResources.get_memory_gb():.1f}GB, "
        f"Using {self.MAX_THREADS} threads and {self.MEMORY_LIMIT_GB}GB memory limit"
    )

setup_config()

Set up the HDX configuration.

Raises:

Type Description
ValueError

If HDX credentials (API key, owner org, maintainer) are not provided.

Source code in overture2hdx/app.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def setup_config(self):
    """
    Set up the HDX configuration.

    Raises:
        ValueError: If HDX credentials (API key, owner org, maintainer) are not provided.
    """
    if not (self.HDX_API_KEY and self.HDX_OWNER_ORG and self.HDX_MAINTAINER):
        raise ValueError("HDX credentials (API key, owner org, maintainer) are required")

    self.HDX_URL_PREFIX = Configuration.create(
        hdx_site=self.HDX_SITE,
        hdx_key=self.HDX_API_KEY,
        user_agent="HDXPythonLibrary/6.3.4",
    )
    logger.info(f"Using HDX site: {self.HDX_URL_PREFIX}")

validate_config()

Validate the configuration.

Raises:

Type Description
ValueError

If HDX credentials environment variables are not set.

ValueError

If ISO3 country code is not specified in YAML configuration.

Source code in overture2hdx/app.py
158
159
160
161
162
163
164
165
166
167
168
169
170
def validate_config(self):
    """
    Validate the configuration.

    Raises:
        ValueError: If HDX credentials environment variables are not set.
        ValueError: If ISO3 country code is not specified in YAML configuration.
    """
    if not (self.HDX_API_KEY and self.HDX_OWNER_ORG and self.HDX_MAINTAINER):
        raise ValueError("HDX credentials environment variables not set")

    if not self.config.get("iso3"):
        raise ValueError("ISO3 country code must be specified in YAML configuration")

Exporter

A class to export map data from OvertureMaps to various formats and upload to HDX. Enhanced with parallel processing and performance optimizations.

Source code in overture2hdx/app.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
class OvertureMapExporter:
    """
    A class to export map data from OvertureMaps to various formats and upload to HDX.
    Enhanced with parallel processing and performance optimizations.
    """

    def __init__(self, config: Config, duckdb_con: str = None):
        self.config = config
        self.duck_con = duckdb_con or os.environ.get("DUCKDB_CON", ":memory:")
        self.conn = duckdb.connect(self.duck_con)
        # Stats for performance monitoring
        self.stats = {
            "start_time": None,
            "end_time": None,
            "categories_processed": 0,
            "failed_categories": 0,
            "total_export_size_mb": 0,
        }

    def setup_duckdb(self, conn):
        """Configure DuckDB with optimal settings based on system resources"""
        setup_queries = [
            "INSTALL spatial",
            "INSTALL httpfs",
            "LOAD spatial",
            "LOAD httpfs",
            "SET s3_region='us-west-2'",
            f"PRAGMA memory_limit='{self.config.MEMORY_LIMIT_GB}GB'",
            f"PRAGMA threads={max(2, self.config.MAX_THREADS - 1)}",
            "PRAGMA enable_object_cache",
            "PRAGMA temp_directory='/tmp/duckdb_temp'",
        ]

        # Create temp directory if it doesn't exist
        os.makedirs("/tmp/duckdb_temp", exist_ok=True)

        for query in setup_queries:
            try:
                conn.execute(query)
                logger.debug(f"Executed DuckDB setup query: {query}")
            except Exception as e:
                logger.warning(f"Failed to execute DuckDB setup query '{query}': {str(e)}")

    def slugify(self, s):
        return re.sub(r"[^a-zA-Z0-9]+", "_", s).lower()

    def build_select_clause(self, select_fields: List[str]) -> str:
        fields = select_fields + ["geometry as geom"]
        return ",\n       ".join(fields)

    def build_where_clause(self, where_conditions: List[str]) -> str:
        bbox_conditions = f"""
            bbox.xmin >= {self.config.bbox[0]} AND
            bbox.xmax <= {self.config.bbox[2]} AND
            bbox.ymin >= {self.config.bbox[1]} AND
            bbox.ymax <= {self.config.bbox[3]}
        """

        if self.config.boundary_gdf_geojson_str:
            bbox_conditions = (
                f"({bbox_conditions}) AND ST_Intersects(geom, ST_GeomFromGeoJSON('{self.config.boundary_gdf_geojson_str}'))"
            )

        if where_conditions:
            custom_conditions = " AND ".join(f"({condition})" for condition in where_conditions)
            return f"({bbox_conditions}) AND ({custom_conditions})"

        return bbox_conditions

    def file_to_zip(self, working_dir, zip_path):
        """Optimized method to create a zip file from a directory"""
        logger.info(f"Creating zip file: {zip_path}")
        buffer_size = 4 * 1024 * 1024  # 4MB buffer for better I/O performance

        with zipfile.ZipFile(
            zip_path,
            "w",
            compression=zipfile.ZIP_DEFLATED,
            allowZip64=True,
            compresslevel=1,  # Faster compression
        ) as zf:
            for file_path in pathlib.Path(working_dir).iterdir():
                logger.debug(f"Adding file to zip: {file_path}")
                file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
                if file_size_mb > 100:  # For large files, use streaming
                    with open(file_path, "rb") as f:
                        with zf.open(file_path.name, "w", force_zip64=True) as dest:
                            shutil.copyfileobj(f, dest, buffer_size)
                else:
                    zf.write(file_path, arcname=file_path.name)

            # Add metadata
            utc_now = datetime.now(timezone.utc)
            utc_offset = utc_now.strftime("%z")
            readme_content = (
                f"Exported using overture2hdx lib : {__version__}\n"
                f"Timestamp (UTC{utc_offset}): {utc_now.strftime('%Y-%m-%d %H:%M:%S')}\n"
                f"Data Source: https://overturemaps.org/\n"
                f"Release: {self.config.OVERTURE_RELEASE_VERSION}\n"
                f"Country: {self.config.country_code}\n"
                f"Bounding Box: {self.config.bbox}"
            )
            zf.writestr("Readme.txt", readme_content)
            zf.writestr("config.yaml", yaml.dump(self.config.config))

        # Calculate zip size for statistics
        zip_size_mb = os.path.getsize(zip_path) / (1024 * 1024)
        self.stats["total_export_size_mb"] += zip_size_mb
        logger.info(f"Created zip file: {zip_path} ({zip_size_mb:.2f} MB)")

        # Clean up working directory
        shutil.rmtree(working_dir)
        return zip_path

    def cleanup(self, zip_paths):
        """Remove temporary zip files"""
        for zip_path in zip_paths:
            try:
                if os.path.exists(zip_path):
                    os.remove(zip_path)
                    logger.debug(f"Removed temporary file: {zip_path}")
            except Exception as e:
                logger.warning(f"Failed to remove temporary file {zip_path}: {str(e)}")

    def export_shapefile(self, category_conn, table_name, category_name, dir_path):
        """
        Export data to shapefile format, handling different geometry types separately.

        Args:
            category_conn: DuckDB connection
            table_name: Name of the table containing the data
            category_name: Name of the category being exported
            dir_path: Directory to save the shapefiles

        Returns:
            bool: True if export was successful
        """
        logger.info("For shapefile format, separating by geometry type")

        # Get all geometry types in the dataset
        geom_types_query = f"""
        SELECT DISTINCT ST_GeometryType(geom) as geom_type 
        FROM {table_name}
        """
        geom_types = [row[0] for row in category_conn.execute(geom_types_query).fetchall()]
        logger.info(f"Found geometry types: {', '.join(geom_types)}")

        if not geom_types:
            logger.warning("No geometry types found in data")
            return False

        # Map ST_ types to simpler names for filenames
        geom_type_mapping = {
            "ST_Point": "points",
            "ST_MultiPoint": "points",
            "ST_LineString": "lines",
            "ST_MultiLineString": "lines",
            "ST_Polygon": "polygons",
            "ST_MultiPolygon": "polygons",
        }

        # Process each geometry type separately
        exported_count = 0
        for geom_type in geom_types:
            # Get a simplified name for the geometry type
            simple_type = geom_type_mapping.get(geom_type, "other")
            export_filename = f"{dir_path}/{category_name}_{simple_type}.shp"

            logger.info(f"Exporting {geom_type} features to {export_filename}")

            export_start = time.time()
            try:
                # Export just this geometry type
                category_conn.execute(
                    f"""
                COPY (
                    SELECT * FROM {table_name}
                    WHERE ST_GeometryType(geom) = '{geom_type}'
                ) TO '{export_filename}' 
                WITH (FORMAT GDAL, SRS 'EPSG:4326', DRIVER 'ESRI Shapefile', LAYER_CREATION_OPTIONS 'ENCODING=UTF-8,2GB_LIMIT=No')
                """
                )

                export_time = time.time() - export_start
                logger.info(f"Export of {simple_type} completed in {export_time:.2f}s")
                exported_count += 1
            except Exception as e:
                logger.error(f"Failed to export {geom_type}: {str(e)}")

        return exported_count > 0

    def process_category(self, category_dict) -> Tuple[str, str, List[str]]:
        """
        Process a single category and return results.

        This is designed to be run in parallel for multiple categories.
        """
        category_name = list(category_dict.keys())[0]
        category_conn = duckdb.connect(self.duck_con)
        self.setup_duckdb(category_conn)

        try:
            logger.info(f"Starting processing of category: {category_name}")
            category_config = category_dict[category_name]
            theme = category_config["theme"][0]
            feature_type = category_config["feature_type"][0]
            select_fields = category_config["select"]
            where_conditions = category_config.get("where", [])
            output_formats = category_config.get("formats", [])
            hdx = category_config.get("hdx")
            hdx_title = hdx.get("title")
            hdx_notes = hdx.get("notes", "Overturemaps Export to use in GIS applications")
            hdx_tags = hdx.get("tags", ["geodata"])
            hdx_caveats = hdx.get(
                "caveats",
                "This is verified by the community overall only but still might have some issues in individual level",
            )
            hdx_license = hdx.get(
                "license",
                "hdx-odc-odbl",
            )

            select_clause = self.build_select_clause(select_fields)
            where_clause = self.build_where_clause(where_conditions)

            # Unique table name to avoid conflicts in parallel processing
            table_name = f"{self.slugify(category_name)}_{os.getpid()}"

            query = f"""
            CREATE OR REPLACE TABLE {table_name} AS (
            SELECT
                {select_clause}
            FROM read_parquet(
                's3://overturemaps-us-west-2/release/{self.config.OVERTURE_RELEASE_VERSION}/theme={theme}/type={feature_type}/*',
                filename=true,
                hive_partitioning=1
            )
            WHERE {where_clause} )
            """

            logger.info(f"Executing DuckDB query for {category_name}")
            logger.debug(f"Query for {category_name}: {query}")

            start_time = time.time()
            category_conn.execute(query)
            query_time = time.time() - start_time

            # Check if any data was returned
            count_result = category_conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()
            feature_count = count_result[0] if count_result else 0

            logger.info(f"Query for {category_name} completed in {query_time:.2f}s, found {feature_count} features")

            if feature_count == 0:
                logger.warning(f"No features found for {category_name} with the specified criteria")

            dt_name = f"{self.config.hdx_key}_{self.config.country_code.lower()}_{self.slugify(category_name)}"

            dataset_args = {
                "title": hdx_title,
                "name": dt_name,
                "notes": hdx_notes,
                "caveats": hdx_caveats,
                "private": False,
                "dataset_source": "OvertureMap",
                "methodology": "Other",
                "methodology_other": "Open Source Geographic information",
                "owner_org": self.config.HDX_OWNER_ORG,
                "maintainer": self.config.HDX_MAINTAINER,
                "subnational": self.config.hdx_subnational,
            }

            # Handle different license types
            if hdx_license == "hdx-odc-odbl":
                dataset_args["license_id"] = hdx_license
            else:
                # Custom license - use "hdx-other" and specify in "license_other"
                dataset_args["license_id"] = "hdx-other"
                dataset_args["license_other"] = hdx_license

            # Create HDX dataset
            dataset = Dataset(
                {
                    "title": hdx_title,
                    "name": dt_name,
                    "notes": hdx_notes,
                    "caveats": hdx_caveats,
                    "private": False,
                    "dataset_source": "OvertureMap",
                    "methodology": "Other",
                    "methodology_other": "Open Source Geographic information",
                    "license_id": hdx_license,
                    "owner_org": self.config.HDX_OWNER_ORG,
                    "maintainer": self.config.HDX_MAINTAINER,
                    "subnational": self.config.hdx_subnational,
                }
            )
            dataset.set_time_period(datetime.strptime(self.config.OVERTURE_RELEASE_VERSION.split(".")[0], "%Y-%m-%d"))
            dataset.set_expected_update_frequency(self.config.frequency)
            dataset.add_other_location(self.config.country_code)
            for tag in hdx_tags:
                dataset.add_tag(tag)

            logger.info(f"Creating HDX dataset for {category_name}")
            dataset.create_in_hdx(allow_no_resources=True)

            format_drivers = {
                "geojson": "GeoJSON",
                "gpkg": "GPKG",
                "shp": "ESRI Shapefile",
            }
            zip_paths = []

            # # Add a  index to improve export performance if the table is large , disabled for now as i don't see any performance improvement
            # if feature_count > 10000:
            #     try:
            #         logger.info(f"Creating index for {table_name}")
            #         category_conn.execute(f"CREATE INDEX idx_{table_name}_geom ON {table_name} (geom)")
            #     except Exception as e:
            #         logger.warning(f"Failed to create index for {table_name}: {str(e)}")

            for fmt in output_formats:
                try:
                    logger.info(f"Exporting {category_name} to {fmt} format")
                    dir_path = f"{os.getcwd()}/{category_name}_{fmt}_{os.getpid()}"
                    os.makedirs(dir_path, exist_ok=True)
                    filename = f"{dir_path}/{category_name}.{fmt}"

                    export_start = time.time()
                    if fmt == "shp":
                        # Special handling for shapefiles
                        success = self.export_shapefile(category_conn, table_name, category_name, dir_path)
                        if not success:
                            logger.error(f"Failed to export any shapefile data for {category_name}")
                            continue
                    else:
                        # Standard export for other formats
                        filename = f"{dir_path}/{category_name}.{fmt}"
                        category_conn.execute(
                            f"COPY {table_name} TO '{filename}' WITH (FORMAT GDAL, SRS 'EPSG:4326', DRIVER '{format_drivers.get(fmt)}', LAYER_CREATION_OPTIONS 'ENCODING=UTF-8')"
                        )
                    export_time = time.time() - export_start

                    logger.info(f"Export to {fmt} completed in {export_time:.2f}s")

                    zip_name = f"{dt_name}_{fmt}.zip".lower()
                    zip_path = self.file_to_zip(dir_path, zip_name)
                    zip_paths.append(zip_path)

                    resource = Resource(
                        {
                            "name": zip_name,
                            "description": f"{category_name} data in {fmt.upper()} format",
                        }
                    )
                    resource.set_format(fmt)
                    resource.set_file_to_upload(zip_path)

                    logger.info(f"Adding resource to HDX dataset: {zip_name}")
                    dataset.add_update_resource(resource)
                    dataset.update_in_hdx()
                except Exception as e:
                    logger.error(f"Error exporting {category_name} to {fmt}: {str(e)}")
                    raise

            # Final update and cleanup
            dataset.update_in_hdx()
            category_conn.execute(f"DROP TABLE IF EXISTS {table_name}")
            category_conn.close()

            logger.info(f"Successfully processed category: {category_name}")
            return category_name, "Success", zip_paths
        except Exception as e:
            logger.error(f"Error processing category {category_name}: {str(e)}", exc_info=True)
            try:
                category_conn.close()
            except:
                pass
            return category_name, f"Failed: {str(e)}", []

    def export(self) -> Dict:
        """
        Execute the export process with parallel processing and performance optimizations.
        """
        self.stats["start_time"] = time.time()
        logger.info(f"Starting export process with {len(self.config.categories)} categories")
        logger.info(f"System configuration: {self.config.MAX_THREADS} threads, {self.config.MEMORY_LIMIT_GB}GB memory limit")

        # Setup DuckDB for main connection
        self.setup_duckdb(self.conn)

        results = {}
        zip_paths_to_cleanup = []

        if self.config.PARALLEL_PROCESSING and len(self.config.categories) > 1:
            logger.info(f"Using parallel processing with {min(self.config.MAX_THREADS, len(self.config.categories))} workers")

            # Use ThreadPoolExecutor for parallel processing
            with concurrent.futures.ThreadPoolExecutor(
                max_workers=min(self.config.MAX_THREADS, len(self.config.categories))
            ) as executor:
                future_to_category = {
                    executor.submit(self.process_category, category_dict): list(category_dict.keys())[0]
                    for category_dict in self.config.categories
                }

                for future in tqdm(
                    concurrent.futures.as_completed(future_to_category),
                    desc="Processing Categories",
                    total=len(self.config.categories),
                ):
                    category_name = future_to_category[future]
                    try:
                        name, status, paths = future.result()
                        results[name] = status
                        zip_paths_to_cleanup.extend(paths)
                        self.stats["categories_processed"] += 1
                        if "Failed" in status:
                            self.stats["failed_categories"] += 1
                    except Exception as exc:
                        logger.error(f"{category_name} generated an exception: {exc}", exc_info=True)
                        results[category_name] = f"Failed: {str(exc)}"
                        self.stats["failed_categories"] += 1
        else:
            # Sequential processing for single category or when parallel is disabled
            logger.info("Using sequential processing")
            for category_dict in tqdm(self.config.categories, desc="Categories"):
                category_name, status, paths = self.process_category(category_dict)
                results[category_name] = status
                zip_paths_to_cleanup.extend(paths)
                self.stats["categories_processed"] += 1
                if "Failed" in status:
                    self.stats["failed_categories"] += 1

        # Cleanup and close
        self.cleanup(zip_paths_to_cleanup)
        try:
            self.conn.close()
        except Exception as e:
            logger.warning(f"Error closing main DuckDB connection: {str(e)}")

        # Calculate statistics
        self.stats["end_time"] = time.time()
        elapsed_time = self.stats["end_time"] - self.stats["start_time"]
        success_count = self.stats["categories_processed"] - self.stats["failed_categories"]

        logger.info(f"Export completed in {elapsed_time:.2f}s")
        logger.info(f"Categories processed: {self.stats['categories_processed']}")
        logger.info(f"Successful: {success_count}, Failed: {self.stats['failed_categories']}")
        logger.info(f"Total export size: {self.stats['total_export_size_mb']:.2f} MB")

        return results

cleanup(zip_paths)

Remove temporary zip files

Source code in overture2hdx/app.py
361
362
363
364
365
366
367
368
369
def cleanup(self, zip_paths):
    """Remove temporary zip files"""
    for zip_path in zip_paths:
        try:
            if os.path.exists(zip_path):
                os.remove(zip_path)
                logger.debug(f"Removed temporary file: {zip_path}")
        except Exception as e:
            logger.warning(f"Failed to remove temporary file {zip_path}: {str(e)}")

export()

Execute the export process with parallel processing and performance optimizations.

Source code in overture2hdx/app.py
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
def export(self) -> Dict:
    """
    Execute the export process with parallel processing and performance optimizations.
    """
    self.stats["start_time"] = time.time()
    logger.info(f"Starting export process with {len(self.config.categories)} categories")
    logger.info(f"System configuration: {self.config.MAX_THREADS} threads, {self.config.MEMORY_LIMIT_GB}GB memory limit")

    # Setup DuckDB for main connection
    self.setup_duckdb(self.conn)

    results = {}
    zip_paths_to_cleanup = []

    if self.config.PARALLEL_PROCESSING and len(self.config.categories) > 1:
        logger.info(f"Using parallel processing with {min(self.config.MAX_THREADS, len(self.config.categories))} workers")

        # Use ThreadPoolExecutor for parallel processing
        with concurrent.futures.ThreadPoolExecutor(
            max_workers=min(self.config.MAX_THREADS, len(self.config.categories))
        ) as executor:
            future_to_category = {
                executor.submit(self.process_category, category_dict): list(category_dict.keys())[0]
                for category_dict in self.config.categories
            }

            for future in tqdm(
                concurrent.futures.as_completed(future_to_category),
                desc="Processing Categories",
                total=len(self.config.categories),
            ):
                category_name = future_to_category[future]
                try:
                    name, status, paths = future.result()
                    results[name] = status
                    zip_paths_to_cleanup.extend(paths)
                    self.stats["categories_processed"] += 1
                    if "Failed" in status:
                        self.stats["failed_categories"] += 1
                except Exception as exc:
                    logger.error(f"{category_name} generated an exception: {exc}", exc_info=True)
                    results[category_name] = f"Failed: {str(exc)}"
                    self.stats["failed_categories"] += 1
    else:
        # Sequential processing for single category or when parallel is disabled
        logger.info("Using sequential processing")
        for category_dict in tqdm(self.config.categories, desc="Categories"):
            category_name, status, paths = self.process_category(category_dict)
            results[category_name] = status
            zip_paths_to_cleanup.extend(paths)
            self.stats["categories_processed"] += 1
            if "Failed" in status:
                self.stats["failed_categories"] += 1

    # Cleanup and close
    self.cleanup(zip_paths_to_cleanup)
    try:
        self.conn.close()
    except Exception as e:
        logger.warning(f"Error closing main DuckDB connection: {str(e)}")

    # Calculate statistics
    self.stats["end_time"] = time.time()
    elapsed_time = self.stats["end_time"] - self.stats["start_time"]
    success_count = self.stats["categories_processed"] - self.stats["failed_categories"]

    logger.info(f"Export completed in {elapsed_time:.2f}s")
    logger.info(f"Categories processed: {self.stats['categories_processed']}")
    logger.info(f"Successful: {success_count}, Failed: {self.stats['failed_categories']}")
    logger.info(f"Total export size: {self.stats['total_export_size_mb']:.2f} MB")

    return results

export_shapefile(category_conn, table_name, category_name, dir_path)

Export data to shapefile format, handling different geometry types separately.

Parameters:

Name Type Description Default
category_conn

DuckDB connection

required
table_name

Name of the table containing the data

required
category_name

Name of the category being exported

required
dir_path

Directory to save the shapefiles

required

Returns:

Name Type Description
bool

True if export was successful

Source code in overture2hdx/app.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
def export_shapefile(self, category_conn, table_name, category_name, dir_path):
    """
    Export data to shapefile format, handling different geometry types separately.

    Args:
        category_conn: DuckDB connection
        table_name: Name of the table containing the data
        category_name: Name of the category being exported
        dir_path: Directory to save the shapefiles

    Returns:
        bool: True if export was successful
    """
    logger.info("For shapefile format, separating by geometry type")

    # Get all geometry types in the dataset
    geom_types_query = f"""
    SELECT DISTINCT ST_GeometryType(geom) as geom_type 
    FROM {table_name}
    """
    geom_types = [row[0] for row in category_conn.execute(geom_types_query).fetchall()]
    logger.info(f"Found geometry types: {', '.join(geom_types)}")

    if not geom_types:
        logger.warning("No geometry types found in data")
        return False

    # Map ST_ types to simpler names for filenames
    geom_type_mapping = {
        "ST_Point": "points",
        "ST_MultiPoint": "points",
        "ST_LineString": "lines",
        "ST_MultiLineString": "lines",
        "ST_Polygon": "polygons",
        "ST_MultiPolygon": "polygons",
    }

    # Process each geometry type separately
    exported_count = 0
    for geom_type in geom_types:
        # Get a simplified name for the geometry type
        simple_type = geom_type_mapping.get(geom_type, "other")
        export_filename = f"{dir_path}/{category_name}_{simple_type}.shp"

        logger.info(f"Exporting {geom_type} features to {export_filename}")

        export_start = time.time()
        try:
            # Export just this geometry type
            category_conn.execute(
                f"""
            COPY (
                SELECT * FROM {table_name}
                WHERE ST_GeometryType(geom) = '{geom_type}'
            ) TO '{export_filename}' 
            WITH (FORMAT GDAL, SRS 'EPSG:4326', DRIVER 'ESRI Shapefile', LAYER_CREATION_OPTIONS 'ENCODING=UTF-8,2GB_LIMIT=No')
            """
            )

            export_time = time.time() - export_start
            logger.info(f"Export of {simple_type} completed in {export_time:.2f}s")
            exported_count += 1
        except Exception as e:
            logger.error(f"Failed to export {geom_type}: {str(e)}")

    return exported_count > 0

file_to_zip(working_dir, zip_path)

Optimized method to create a zip file from a directory

Source code in overture2hdx/app.py
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
def file_to_zip(self, working_dir, zip_path):
    """Optimized method to create a zip file from a directory"""
    logger.info(f"Creating zip file: {zip_path}")
    buffer_size = 4 * 1024 * 1024  # 4MB buffer for better I/O performance

    with zipfile.ZipFile(
        zip_path,
        "w",
        compression=zipfile.ZIP_DEFLATED,
        allowZip64=True,
        compresslevel=1,  # Faster compression
    ) as zf:
        for file_path in pathlib.Path(working_dir).iterdir():
            logger.debug(f"Adding file to zip: {file_path}")
            file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
            if file_size_mb > 100:  # For large files, use streaming
                with open(file_path, "rb") as f:
                    with zf.open(file_path.name, "w", force_zip64=True) as dest:
                        shutil.copyfileobj(f, dest, buffer_size)
            else:
                zf.write(file_path, arcname=file_path.name)

        # Add metadata
        utc_now = datetime.now(timezone.utc)
        utc_offset = utc_now.strftime("%z")
        readme_content = (
            f"Exported using overture2hdx lib : {__version__}\n"
            f"Timestamp (UTC{utc_offset}): {utc_now.strftime('%Y-%m-%d %H:%M:%S')}\n"
            f"Data Source: https://overturemaps.org/\n"
            f"Release: {self.config.OVERTURE_RELEASE_VERSION}\n"
            f"Country: {self.config.country_code}\n"
            f"Bounding Box: {self.config.bbox}"
        )
        zf.writestr("Readme.txt", readme_content)
        zf.writestr("config.yaml", yaml.dump(self.config.config))

    # Calculate zip size for statistics
    zip_size_mb = os.path.getsize(zip_path) / (1024 * 1024)
    self.stats["total_export_size_mb"] += zip_size_mb
    logger.info(f"Created zip file: {zip_path} ({zip_size_mb:.2f} MB)")

    # Clean up working directory
    shutil.rmtree(working_dir)
    return zip_path

process_category(category_dict)

Process a single category and return results.

This is designed to be run in parallel for multiple categories.

Source code in overture2hdx/app.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
def process_category(self, category_dict) -> Tuple[str, str, List[str]]:
    """
    Process a single category and return results.

    This is designed to be run in parallel for multiple categories.
    """
    category_name = list(category_dict.keys())[0]
    category_conn = duckdb.connect(self.duck_con)
    self.setup_duckdb(category_conn)

    try:
        logger.info(f"Starting processing of category: {category_name}")
        category_config = category_dict[category_name]
        theme = category_config["theme"][0]
        feature_type = category_config["feature_type"][0]
        select_fields = category_config["select"]
        where_conditions = category_config.get("where", [])
        output_formats = category_config.get("formats", [])
        hdx = category_config.get("hdx")
        hdx_title = hdx.get("title")
        hdx_notes = hdx.get("notes", "Overturemaps Export to use in GIS applications")
        hdx_tags = hdx.get("tags", ["geodata"])
        hdx_caveats = hdx.get(
            "caveats",
            "This is verified by the community overall only but still might have some issues in individual level",
        )
        hdx_license = hdx.get(
            "license",
            "hdx-odc-odbl",
        )

        select_clause = self.build_select_clause(select_fields)
        where_clause = self.build_where_clause(where_conditions)

        # Unique table name to avoid conflicts in parallel processing
        table_name = f"{self.slugify(category_name)}_{os.getpid()}"

        query = f"""
        CREATE OR REPLACE TABLE {table_name} AS (
        SELECT
            {select_clause}
        FROM read_parquet(
            's3://overturemaps-us-west-2/release/{self.config.OVERTURE_RELEASE_VERSION}/theme={theme}/type={feature_type}/*',
            filename=true,
            hive_partitioning=1
        )
        WHERE {where_clause} )
        """

        logger.info(f"Executing DuckDB query for {category_name}")
        logger.debug(f"Query for {category_name}: {query}")

        start_time = time.time()
        category_conn.execute(query)
        query_time = time.time() - start_time

        # Check if any data was returned
        count_result = category_conn.execute(f"SELECT COUNT(*) FROM {table_name}").fetchone()
        feature_count = count_result[0] if count_result else 0

        logger.info(f"Query for {category_name} completed in {query_time:.2f}s, found {feature_count} features")

        if feature_count == 0:
            logger.warning(f"No features found for {category_name} with the specified criteria")

        dt_name = f"{self.config.hdx_key}_{self.config.country_code.lower()}_{self.slugify(category_name)}"

        dataset_args = {
            "title": hdx_title,
            "name": dt_name,
            "notes": hdx_notes,
            "caveats": hdx_caveats,
            "private": False,
            "dataset_source": "OvertureMap",
            "methodology": "Other",
            "methodology_other": "Open Source Geographic information",
            "owner_org": self.config.HDX_OWNER_ORG,
            "maintainer": self.config.HDX_MAINTAINER,
            "subnational": self.config.hdx_subnational,
        }

        # Handle different license types
        if hdx_license == "hdx-odc-odbl":
            dataset_args["license_id"] = hdx_license
        else:
            # Custom license - use "hdx-other" and specify in "license_other"
            dataset_args["license_id"] = "hdx-other"
            dataset_args["license_other"] = hdx_license

        # Create HDX dataset
        dataset = Dataset(
            {
                "title": hdx_title,
                "name": dt_name,
                "notes": hdx_notes,
                "caveats": hdx_caveats,
                "private": False,
                "dataset_source": "OvertureMap",
                "methodology": "Other",
                "methodology_other": "Open Source Geographic information",
                "license_id": hdx_license,
                "owner_org": self.config.HDX_OWNER_ORG,
                "maintainer": self.config.HDX_MAINTAINER,
                "subnational": self.config.hdx_subnational,
            }
        )
        dataset.set_time_period(datetime.strptime(self.config.OVERTURE_RELEASE_VERSION.split(".")[0], "%Y-%m-%d"))
        dataset.set_expected_update_frequency(self.config.frequency)
        dataset.add_other_location(self.config.country_code)
        for tag in hdx_tags:
            dataset.add_tag(tag)

        logger.info(f"Creating HDX dataset for {category_name}")
        dataset.create_in_hdx(allow_no_resources=True)

        format_drivers = {
            "geojson": "GeoJSON",
            "gpkg": "GPKG",
            "shp": "ESRI Shapefile",
        }
        zip_paths = []

        # # Add a  index to improve export performance if the table is large , disabled for now as i don't see any performance improvement
        # if feature_count > 10000:
        #     try:
        #         logger.info(f"Creating index for {table_name}")
        #         category_conn.execute(f"CREATE INDEX idx_{table_name}_geom ON {table_name} (geom)")
        #     except Exception as e:
        #         logger.warning(f"Failed to create index for {table_name}: {str(e)}")

        for fmt in output_formats:
            try:
                logger.info(f"Exporting {category_name} to {fmt} format")
                dir_path = f"{os.getcwd()}/{category_name}_{fmt}_{os.getpid()}"
                os.makedirs(dir_path, exist_ok=True)
                filename = f"{dir_path}/{category_name}.{fmt}"

                export_start = time.time()
                if fmt == "shp":
                    # Special handling for shapefiles
                    success = self.export_shapefile(category_conn, table_name, category_name, dir_path)
                    if not success:
                        logger.error(f"Failed to export any shapefile data for {category_name}")
                        continue
                else:
                    # Standard export for other formats
                    filename = f"{dir_path}/{category_name}.{fmt}"
                    category_conn.execute(
                        f"COPY {table_name} TO '{filename}' WITH (FORMAT GDAL, SRS 'EPSG:4326', DRIVER '{format_drivers.get(fmt)}', LAYER_CREATION_OPTIONS 'ENCODING=UTF-8')"
                    )
                export_time = time.time() - export_start

                logger.info(f"Export to {fmt} completed in {export_time:.2f}s")

                zip_name = f"{dt_name}_{fmt}.zip".lower()
                zip_path = self.file_to_zip(dir_path, zip_name)
                zip_paths.append(zip_path)

                resource = Resource(
                    {
                        "name": zip_name,
                        "description": f"{category_name} data in {fmt.upper()} format",
                    }
                )
                resource.set_format(fmt)
                resource.set_file_to_upload(zip_path)

                logger.info(f"Adding resource to HDX dataset: {zip_name}")
                dataset.add_update_resource(resource)
                dataset.update_in_hdx()
            except Exception as e:
                logger.error(f"Error exporting {category_name} to {fmt}: {str(e)}")
                raise

        # Final update and cleanup
        dataset.update_in_hdx()
        category_conn.execute(f"DROP TABLE IF EXISTS {table_name}")
        category_conn.close()

        logger.info(f"Successfully processed category: {category_name}")
        return category_name, "Success", zip_paths
    except Exception as e:
        logger.error(f"Error processing category {category_name}: {str(e)}", exc_info=True)
        try:
            category_conn.close()
        except:
            pass
        return category_name, f"Failed: {str(e)}", []

setup_duckdb(conn)

Configure DuckDB with optimal settings based on system resources

Source code in overture2hdx/app.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def setup_duckdb(self, conn):
    """Configure DuckDB with optimal settings based on system resources"""
    setup_queries = [
        "INSTALL spatial",
        "INSTALL httpfs",
        "LOAD spatial",
        "LOAD httpfs",
        "SET s3_region='us-west-2'",
        f"PRAGMA memory_limit='{self.config.MEMORY_LIMIT_GB}GB'",
        f"PRAGMA threads={max(2, self.config.MAX_THREADS - 1)}",
        "PRAGMA enable_object_cache",
        "PRAGMA temp_directory='/tmp/duckdb_temp'",
    ]

    # Create temp directory if it doesn't exist
    os.makedirs("/tmp/duckdb_temp", exist_ok=True)

    for query in setup_queries:
        try:
            conn.execute(query)
            logger.debug(f"Executed DuckDB setup query: {query}")
        except Exception as e:
            logger.warning(f"Failed to execute DuckDB setup query '{query}': {str(e)}")