diff --git a/exposureinitializer/exposureinitializer.py b/exposureinitializer/exposureinitializer.py index e399dbf91d24648b00d16257f80d92d62f6437d7..8e2dc6bfd44cecee47e62534d06d692c359abf72 100644 --- a/exposureinitializer/exposureinitializer.py +++ b/exposureinitializer/exposureinitializer.py @@ -22,8 +22,11 @@ import sys import argparse import configparser import datetime +import glob +import csv from databaselib.database import PostGISDatabase from exposurelib.database import SpatiaLiteExposure, PostGISExposure +from exposurelib.utils import add_occupancy_to_taxonomy # Add a logger printing error, warning, info and debug messages to the screen @@ -155,6 +158,212 @@ class ExposureInitializer: self.exposure_db.connection.commit() tile_db.close() + @staticmethod + def add_asset_to_dict(asset_dict, taxonomy_id, asset): + """ + Adds an asset to an asset list by first searching for an asset with the same taxonomy in + the list. If found, the asset is added to the existing one, otherwise the list is + extended by the new asset. + + Args: + asset_dict (dict of dicts): + Dictionary of assets. + taxonomy_id (int): + ID of the taxonomy (refers to `id` in table `Taxonomy`) + asset (dict): + Asset to be added to `asset_dict` under the key `taxonomy_id`. + Each asset contains of: + number : number of buildings or proportion of building + structural: structural costs of the asset + night : nighttime population of the asset + + Returns: + The asset dictionary with the new asset added + """ + + if asset_dict.get(taxonomy_id, None) is None: + asset_dict[taxonomy_id] = { + "number": asset["number"], + "structural": asset["structural"], + "night": asset["night"], + } + else: + asset_dict[taxonomy_id]["number"] += asset["number"] + asset_dict[taxonomy_id]["structural"] += asset["structural"] + asset_dict[taxonomy_id]["night"] += asset["night"] + return asset_dict + + def process_district(self, boundary_id, country_iso_code, asset_dict): + """ + Processes all assets provided in `asset_list` for one district with `boundary_id`. + First, the asset list is reduced to unique taxonomy entries by summing up all key + values per taxonomy. Second, all tiles within the boundary with `boundary_id` are + identified and their proportion of the built area compared to the sum of the built area + of the selected tiles is computed. Third, for each tile with a built area a + reference entity is created and the reduced asset list is inserted as reference assets + with all values multiplied by the proportion factor, herby distributing all assets of + the exposure model proportionally on the built area to the tiles (entities). + + Args: + boundary_id (str): + ID of the district boundary + country_iso_code (str): + ISO 3166-1 alpha-3 code of the country + asset_dict (dict): + Dictionary of asset values + """ + + # Get all tiles within the boundary, estimate their built_area proportion and + # create the EntityReference and AssetReference datasets + logger.info("Query the built-area proportion per tile") + + # Query explanation: Subquery `A` selects all tiles and is joined with subquery `B` + # selecting the boundary, resulting in subquery `T` delivering all tiles for which the + # centroid is located within the boundary. This subquery `T` is then used twice, first + # in subquery `SumQuery` to estimate the sum of the built-area and the cross-joined + # query retrieving the built-area size per tile to compute the built-area proportion. + if isinstance(self.exposure_db, SpatiaLiteExposure): + where_clause = "WHERE built_area_size IS NOT NULL" + else: + where_clause = "WHERE A.geometry && B.border AND built_area_size IS NOT NULL" + sql_statement = f""" + WITH T AS + ( + SELECT quadkey, built_area_size + FROM + ( + SELECT quadkey, built_area_size, country_iso_code, + {self.exposure_db.geometry_field} + FROM {self.exposure_db.tile_view} + ) AS A + INNER JOIN + ( + SELECT {self.exposure_db.geometry_field} AS border + FROM {self.exposure_db.boundary_table} + WHERE {self.exposure_db.boundary_id_field}= '{boundary_id}' + ) AS B + ON ST_Contains(B.border, ST_Centroid(A.{self.exposure_db.geometry_field})) + {where_clause} + ) + SELECT T.quadkey, T.built_area_size/SumQuery.total_sum as proportion + FROM T + CROSS JOIN + ( + SELECT SUM(T.built_area_size) as total_sum FROM T + ) AS SumQuery + """ + self.exposure_db.cursor.execute(sql_statement) + tiles = self.exposure_db.cursor.fetchall() + logger.info("Insert assets for each tile in boundary %s" % boundary_id) + for quadkey, proportion in tiles: + # Check if entity exists in EntityReference and create if necessary + entity_id = self.exposure_db.get_reference_entity_id(quadkey) + if entity_id is None: + entity_id = self.exposure_db.insert_reference_entity(quadkey, country_iso_code) + + # Add the respective assets by proportion of built area to `AssetReference` + for taxonomy_id, value in asset_dict.items(): + sql_statement = """ + INSERT INTO AssetReference + (entity_id, taxonomy_id, number, structural, night) + VALUES (%d, %d, %f, %f, %f) + """ % ( + entity_id, + int(taxonomy_id), # taxonomy_id + float(value["number"]) * proportion, # number + float(value["structural"]) * proportion, # structural + float(value["night"]) * proportion, # night + ) + self.exposure_db.cursor.execute(sql_statement) + self.exposure_db.connection.commit() + + def import_exposure(self, exposure_model_search_pattern, country_iso_code): + """ + Imports an OpenQuake-compatible aggregated exposure model into baseline entities and + baseline assets. The function reads in all exposure models matching the search + pattern. Each file is read line by line assuming that all assets of one location are + stored consecutively in the file. The function collects all assets per district and + passes them to the `process_district` function to be processed and stored in the + database. It also collects all assets of the entire country, normalizes them and stores + them as the country-average asset distribution in `AssetCountry`. + + Args: + exposure_model_search_pattern (str): + Search pattern to identify all necessary exposure filepaths. + country_iso_code (str): + ISO 3166-1 alpha-3 code of the country. + """ + + country_asset_dict = {} + # Iterate through all given exposure files + for exposure_filepath in glob.glob(exposure_model_search_pattern): + logger.info(f"Processing {exposure_filepath}") + csv_reader = csv.DictReader(open(exposure_filepath), delimiter=",") + # Sort the exposure file by boundary ID to have all assets of one district being + # listed consecutively to avoid listing same taxonomies multiple times + sorted_exposure = sorted(csv_reader, key=lambda line: line["BOUNDARY_ID"]) + # Prepare the control variables + last_boundary_id = None + location_count = 0 + asset_dict = {} + boundary_id = None + for row in sorted_exposure: + # Check if the line starts the asset list of a new location + if not (last_boundary_id == row["BOUNDARY_ID"]): + if location_count > 0: + pass + self.process_district(boundary_id, country_iso_code, asset_dict) + location_count += 1 + boundary_id = row["BOUNDARY_ID"] + asset_dict = {} # Reset the location-based asset dictionary + last_boundary_id = row["BOUNDARY_ID"] + + # Read in an asset + # Add the occupancy to the taxonomy string and store it in the database + taxonomy = add_occupancy_to_taxonomy( + row["TAXONOMY"], str(row["OCCUPANCY"]).upper() + ) + if not self.exposure_db.taxonomy_string_exists(taxonomy): + self.exposure_db.insert_taxonomy(taxonomy) + taxonomy_id = self.exposure_db.get_taxonomy_id(taxonomy) + asset = { + "number": float(row["BUILDINGS"]), + "structural": float(row["COST_STRUCTURAL_EUR"]), + "night": float(row["OCCUPANTS_PER_ASSET_NIGHT"]), + } + # Store the asset in a location-based list and country-based list + asset_dict = self.add_asset_to_dict(asset_dict, taxonomy_id, asset) + country_asset_dict = self.add_asset_to_dict( + country_asset_dict, taxonomy_id, asset + ) + + self.process_district(boundary_id, country_iso_code, asset_dict) + + logger.info("Assign the country-average assets") + # Normalize the country-average asset distribution + sum_number = 0 + for taxonomy_id, value in country_asset_dict.items(): + sum_number += value["number"] + # Assign the country-average asset distribution to the country assets + for taxonomy_id, value in country_asset_dict.items(): + sql_statement = """ + INSERT INTO AssetCountry + (country_iso_code, taxonomy_id, number, number_normalized, structural, + structural_normalized, night, night_normalized) + VALUES ('%s', %d, %f, %f, %f, %f, %f, %f) + """ % ( + country_iso_code, + int(taxonomy_id), + float(value["number"]), + float(value["number"]) / sum_number, + float(value["structural"]), + float(value["structural"]) / sum_number, + float(value["night"]), + float(value["night"]) / sum_number, + ) + self.exposure_db.cursor.execute(sql_statement) + self.exposure_db.connection.commit() + def command_line_interface(): """ @@ -274,7 +483,7 @@ def command_line_interface(): else: distributor.exposure_db.clean_up_reference(country_iso_code) distributor.exposure_db.clean_up_asset_country(country_iso_code) - # distributor.import_exposure(exposure_model, country_iso_code) + distributor.import_exposure(exposure_model, country_iso_code) # distributor.process_gaps_in_exposure(country_iso_code, country_boundary_id) distributor.exposure_db.commit_and_close()