import os
import time
import pandas as pd
from sqlalchemy import (
MetaData,
update,
case,
create_engine
)
def _map_entities(entities, _id):
_map={}
for entity in entities:
_map[entity[_id]] = entity["name"]
return _map
def _generate_table_map(engine):
meta_source = MetaData()
meta_source.reflect(bind=engine)
table_map = {}
for table in meta_source.sorted_tables:
table_map[table.name] = table
return table_map
def read_meta(engine_dp_1):
sites = pd.read_sql("SELECT * FROM site;", engine_dp_1).to_dict(orient="records")
sources = pd.read_sql("SELECT * FROM source;", engine_dp_1).to_dict(orient="records")
parameters = pd.read_sql("SELECT * FROM parameter;", engine_dp_1).to_dict(orient="records")
return sites, sources, parameters
def get_column(table, column_name):
return[c for c in table.columns if c.name == column_name][0]
def generate_map(dp1_entity, dp2_entity, _id):
'''
generate map NEW ID to OLD ID
'''
dp1 = {
entity_name: entity_id for entity_id, entity_name in _map_entities(dp1_entity, _id).items()
}
dp2 = {
entity_name: entity_id for entity_id, entity_name in _map_entities(dp2_entity, _id).items()
}
# only map changed values!!!
_map = {}
for entity_name in dp1:
id_dp1 = dp1[entity_name]
id_dp2 = dp2[entity_name.replace(".", "")]
if id_dp1 != id_dp2:
_map[id_dp2] = id_dp1
return _map
def add_offset(engine, _map, table_name, column_name, offset=1000, execute=False):
tables = _generate_table_map(engine)
table = tables[table_name]
column = get_column(table, column_name)
dp2_ids = tuple(_map)
dp2_ids_with_offset = {k + offset: v for k, v in _map.items()}
add_offset = update(
table
).where(
column.in_(dp2_ids)
).values(
{
column_name : case(
[
(column == _id, _id + offset) for _id in dp2_ids
]
)
}
)
if execute:
conn = engine.connect()
st = time.time()
print("Adding offset")
conn.execute(add_offset)
print(f"took: {time.time()-st}s")
conn.close()
return add_offset
def fix_ids(engine, _map, table_name, column_name, offset=1000, execute=False):
tables = _generate_table_map(engine)
table = tables[table_name]
column = get_column(table, column_name)
dp2_ids = tuple(_map)
dp2_ids_with_offset = {k + offset: v for k, v in _map.items()}
fixing_ids = update(
table
).where(
column.in_(tuple(dp2_ids_with_offset))
).values(
{
column_name : case(
[
(column == offset_id, dp1_id) for offset_id, dp1_id in dp2_ids_with_offset.items()
]
)
}
)
if execute:
conn = engine.connect()
st = time.time()
print("Fixing ids")
conn.execute(fixing_ids)
print(f"took: {time.time()-st}s")
conn.close()
return fixing_ids
def equal(dp1, dp2, key_field, value_field, replace_dp2=("", "")):
return {
entry[key_field]: entry[value_field] for entry in dp1
} == {
entry[key_field]: entry[value_field] for entry in dp2
}
*_1 -> old version
*_2 -> new version
connection_datapool_1 = dict(
host = "HOST-ADDRESS-OLD",
user = "datapool",
database = "datapool",
password = "YOUR-VERY-SECRET-PASSWORD"
)
connection_datapool_2 = dict(
host = "HOST-ADDRESS-NEW",
user = "datapool",
database = "datapool",
password = "YOUR-VERY-SECRET-PASSWORD"
)
engine_dp_1 = create_engine(
'postgresql+psycopg2://{user}:{password}@{host}/{database}'.format(**connection_datapool_1)
)
engine_dp_2 = create_engine(
'postgresql+psycopg2://{user}:{password}@{host}/{database}'.format(**connection_datapool_2)
)
dp1_sites, dp1_sources, dp1_parameters = read_meta(engine_dp_1)
dp2_sites, dp2_sources, dp2_parameters = read_meta(engine_dp_2)
dp2_dp1_site = generate_map(dp1_sites, dp2_sites, "site_id")
dp2_dp1_source = generate_map(dp1_sources, dp2_sources, "source_id")
dp2_dp1_parameters = generate_map(dp1_parameters, dp2_parameters, "parameter_id")
if not os.path.exists("sdwh4_sites_backup.csv"):
pd.read_sql("SELECT * FROM site;", engine_dp_2).to_csv("backup_sites.csv", sep=";")
pd.read_sql("SELECT * FROM source;", engine_dp_2).to_csv("backup_sources.csv", sep=";")
pd.read_sql("SELECT * FROM parameter;", engine_dp_2).to_csv("backup_parameters.csv", sep=";")
# execute must be True in order to commit changes to the database!
offset = add_offset(engine_dp_2, dp2_dp1_site, "site", "site_id", execute=False)
fixed = fix_ids(engine_dp_2, dp2_dp1_site, "site", "site_id", execute=False)
dp2_sites_new, _, _ = read_meta(engine_dp_2)
equal(dp1_sites,dp2_sites_new, "site_id", "name")
# execute must be True in order to commit changes to the database!
offset = add_offset(engine_dp_2, dp2_dp1_parameters, "parameter", "parameter_id", execute=False)
fixed = fix_ids(engine_dp_2, dp2_dp1_parameters, "parameter", "parameter_id", execute=False)
_, _, dp2_parameters_new = read_meta(engine_dp_2)
equal(dp1_parameters,dp2_parameters_new, "parameter_id", "name")
# execute must be True in order to commit changes to the database!
offset = add_offset(engine_dp_2, dp2_dp1_source, "source", "source_id", execute=False)
fixed = fix_ids(engine_dp_2, dp2_dp1_source, "source", "source_id", execute=False)
_, dp2_sources_new, _ = read_meta(engine_dp_2)
equal(dp1_sources,dp2_sources_new, "source_id", "name")