from pathlib import Path
import subprocess
tmp_dir = "/home/heinrich/datapool_related/datapool2/migration_sdwh2/tmp"
*_1 -> old version
*_2 -> new version
linux_user_account_datapool_1 = dict(
host = "152.88.206.68",
user = "foerstch",
location_landing_zone = "/nfsmount/landing_zone",
location_backup_landing_zone = "/nfsmount/landing_zone_backup"
)
linux_user_account_datapool_2 = dict(
host = "eaw-sdwh4.eawag.wroot.emp-eaw.ch",
user = "foerstch",
location_development_landing_zone = "/data/development/adding_conversion_scripts"
)
def _run_command(command):
result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
err, out = result.stderr.decode("utf-8"), result.stdout.decode("utf-8")
return out, err
def pass_command_datapool_1(cmd):
command = [
"ssh",
"{user}@{host}".format(**linux_user_account_datapool_1),
cmd
]
return _run_command(command)
def pass_command_datapool_2(cmd):
command = [
"ssh",
"{user}@{host}".format(**linux_user_account_datapool_2),
cmd
]
return _run_command(command)
def download_file(source_path, dest_path):
command = [
"scp",
"{user}@{host}:{source_path}".format(
source_path=source_path,
**linux_user_account_datapool_1
),
dest_path
]
return _run_command(command)
def upload_file(source_path, dest_path):
command = [
"scp",
source_path,
"{user}@{host}:{dest_path}".format(
dest_path=dest_path,
**linux_user_account_datapool_2
)
]
return _run_command(command)
def make_download_commands_conversion(conversion_scripts, dest_path):
download_commands = []
for conversion_script in conversion_scripts:
download_commands.append(
{
"source_path": conversion_script,
"dest_path": f"{dest_path}/" + conversion_script.replace("/", "-+-")
}
)
return download_commands
def make_upload_commands_conversion(download_commands):
lz = linux_user_account_datapool_1["location_landing_zone"]
dlz = linux_user_account_datapool_2["location_development_landing_zone"]
upload_commands = []
for command in download_commands:
source = command["source_path"]
dst = command["dest_path"]
upload_commands.append(
{
"source_path": dst,
"dest_path": source.replace("-+-", "/").replace(lz, dlz)
}
)
return upload_commands
cmd = 'find {location_landing_zone} -type f -name "conversion.*"'.format(**linux_user_account_datapool_1)
out, _ = pass_command_datapool_1(cmd)
conversion_scripts_source = out.split("\n")[:-1]
download_commands = make_download_commands_conversion(conversion_scripts_source, tmp_dir)
upload_commands = make_upload_commands_conversion(download_commands)
Example for fixing source and source_type naming changes from one datapool version to the other!
def _fix_dest_path(_upload_commands):
"""
If sources or source_types have been renamed or names in source*.yaml do not
correspond to folder names paths might need fixing
"""
from copy import deepcopy
upload_commands = deepcopy(_upload_commands)
for idx, command in enumerate(upload_commands):
dst = Path(command["dest_path"])
#separation of path and conversion script to change path independently
parent = dst.parent
file = dst.name
destination = f"{parent}".replace(
".", # old source name contains .
"" # new version does not
).replace(
"air_temp_humid_SHT35", # old source_type name
"Airtemphumid_SHT35" # new source_type name
).replace(
"Turbidity_EndressHauser", # old source_type name
"Turbidity" # new source_type name
).replace(
"weather_station_WS700UMB", # old source_type name
"Weather_station_WS700_UMB" # new source_type name
)
destination += f"/{file}"
upload_commands[idx]["dest_path"] = destination
return upload_commands
fixing naming changes
upload_commands = _fix_dest_path(upload_commands)
You can chech the subprocess execution via the stdout and stderr lists.
d_out, d_err = [], [] # to identy possible problems
for download_command in download_commands:
out, err = download_file(**download_command)
d_out.append(out), d_err.append(err)
u_out, u_err = [], [] # to identy possible problems
for upload_command in upload_commands:
out, err = upload_file(**upload_command)
u_out.append(out), u_err.append(err)
cmd = 'find {location_backup_landing_zone} -type f -name "*.raw.*"'.format(**linux_user_account_datapool_1)
out, _ = pass_command_datapool_1(cmd)
raw_files = out.split("\n")[:-1]
def find_raw_for_conversion_scripts(raw_files, upload_commands, point_out_missing_raw_files=False):
_raw_files = {}
for upload_command in upload_commands:
source = Path(upload_command["source_path"].replace("-+-", "/"))
dest = Path(upload_command["dest_path"])
source_type_source = f"{source.parent.parent.name}/{source.parent.name}"
_raw_files[source_type_source] = []
for file in raw_files:
if len(_raw_files[source_type_source]) == 2:
break
if source_type_source in file:
_raw_files[source_type_source].append(
file
)
if point_out_missing_raw_files:
for key, val in _raw_files.items():
if not val:
print(f"No raw data available for {key}")
return _raw_files
def remove_broken_files(raw_files):
return [file for file in raw_files if not "broken" in file]
def _remove_conversion_scripts_where_raw_files_missing(source_type_source_to_raw_map, upload_commands):
to_delete = []
for key, val in source_type_source_to_raw_map.items():
if not val:
for cmd in upload_commands:
source_path = cmd["source_path"].replace("-+-", "/")
if key in source_path:
to_delete.append(
f"rm {cmd['dest_path']}"
)
return to_delete
def make_download_commands_raw(source_type_source_to_raw_map, dest_path):
download_commands = []
for raw_files in source_type_source_to_raw_map.values():
if raw_files:
for file in raw_files:
download_commands.append(
{
"source_path": file,
"dest_path": f"{dest_path}/" + file.replace("/", "-+-")
}
)
return download_commands
def make_upload_commands_raw(download_commands_raw):
blz = linux_user_account_datapool_1["location_backup_landing_zone"]
dlz = linux_user_account_datapool_2["location_development_landing_zone"]
upload_commands = []
for command in download_commands_raw:
source = command["source_path"]
dst = command["dest_path"]
destination = source.replace("-+-", "/").replace(blz, dlz)
destination = Path(destination).with_suffix("")
upload_commands.append(
{
"source_path": dst,
"dest_path": destination.as_posix()
}
)
return upload_commands
raw_files = remove_broken_files(raw_files)
source_type_source_to_raw_map = find_raw_for_conversion_scripts(
raw_files,
upload_commands,
point_out_missing_raw_files=False
)
a) remove commands for conversion scripts without raw files
b) download commands
c) upload commands
to_delete_conversion_scripts = _remove_conversion_scripts_where_raw_files_missing(
source_type_source_to_raw_map,
upload_commands
)
download_commands_raw = make_download_commands_raw(
source_type_source_to_raw_map, tmp_dir
)
upload_commands_raw = make_upload_commands_raw(
download_commands_raw
)
d_out, d_err = [], [] # to identy possible problems
for download_command in download_commands_raw:
out, err = download_file(**download_command)
d_out.append(out), d_err.append(err)
u_out, u_err = [], [] # to identy possible problems
for upload_command in upload_commands_raw:
out, err = upload_file(**upload_command)
u_out.append(out), u_err.append(err)
d_out, d_err = [], [] # to identy possible problems
for script in to_delete_conversion_scripts:
out, err = pass_command_datapool_2(script)
d_out.append(out), d_err.append(err)