In [ ]:
from pathlib import Path
import subprocess

Specify a temporary folder to which conversion scripts and raw files will be downloaded

In [ ]:
tmp_dir = "/home/heinrich/datapool_related/datapool2/migration_sdwh2/tmp"

Specify server credentials and necessary datapool paths

*_1 -> old version
*_2 -> new version
In [ ]:
linux_user_account_datapool_1 = dict(
    host = "152.88.206.68",
    user = "foerstch",
    location_landing_zone = "/nfsmount/landing_zone",
    location_backup_landing_zone = "/nfsmount/landing_zone_backup"
)


linux_user_account_datapool_2 = dict(
    host = "eaw-sdwh4.eawag.wroot.emp-eaw.ch",
    user = "foerstch",
    location_development_landing_zone = "/data/development/adding_conversion_scripts"
)
In [ ]:
def _run_command(command):
    result = subprocess.run(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
    err, out = result.stderr.decode("utf-8"), result.stdout.decode("utf-8")
    return out, err

def pass_command_datapool_1(cmd):
    command = [
        "ssh", 
        "{user}@{host}".format(**linux_user_account_datapool_1), 
        cmd
    ]
    return _run_command(command)

def pass_command_datapool_2(cmd):
    command = [
        "ssh", 
        "{user}@{host}".format(**linux_user_account_datapool_2), 
        cmd
    ]
    return _run_command(command)

def download_file(source_path, dest_path):
    command = [
        "scp", 
        "{user}@{host}:{source_path}".format(
            source_path=source_path, 
            **linux_user_account_datapool_1
        ), 
        dest_path
    ]
    return _run_command(command)

def upload_file(source_path, dest_path):
    command = [
        "scp", 
        source_path,
        "{user}@{host}:{dest_path}".format(
            dest_path=dest_path, 
            **linux_user_account_datapool_2
        )
    ]
    return _run_command(command)

def make_download_commands_conversion(conversion_scripts, dest_path):
    download_commands = []
    for conversion_script in conversion_scripts:
        download_commands.append(
            {
                "source_path": conversion_script,
                "dest_path": f"{dest_path}/" + conversion_script.replace("/", "-+-")
            }
        )
    return download_commands

def make_upload_commands_conversion(download_commands):
    lz = linux_user_account_datapool_1["location_landing_zone"]
    dlz = linux_user_account_datapool_2["location_development_landing_zone"]
    
    upload_commands = []
    for command in download_commands:
        source = command["source_path"]
        dst = command["dest_path"]

        upload_commands.append(
            {
                "source_path": dst,
                "dest_path": source.replace("-+-", "/").replace(lz, dlz)
            }
        )
    return upload_commands

List all conversion scripts in old landing zone

In [ ]:
cmd = 'find {location_landing_zone} -type f -name "conversion.*"'.format(**linux_user_account_datapool_1)
out, _ = pass_command_datapool_1(cmd)
conversion_scripts_source = out.split("\n")[:-1]

Set up download and upload commands

In [ ]:
download_commands = make_download_commands_conversion(conversion_scripts_source, tmp_dir)
upload_commands = make_upload_commands_conversion(download_commands)

Example for fixing source and source_type naming changes from one datapool version to the other!

In [ ]:
def _fix_dest_path(_upload_commands):
    """
    If sources or source_types have been renamed or names in source*.yaml do not 
    correspond to folder names paths might need fixing
    """
    from copy import deepcopy
    upload_commands = deepcopy(_upload_commands)
    
    for idx, command in enumerate(upload_commands):
        dst = Path(command["dest_path"])
        #separation of path and conversion script to change path independently 
        parent = dst.parent 
        file = dst.name
        
        destination = f"{parent}".replace(
            ".", # old source name contains .
            "" # new version does not
        ).replace(
            "air_temp_humid_SHT35", # old source_type name
            "Airtemphumid_SHT35" # new source_type name
        ).replace(
            "Turbidity_EndressHauser", # old source_type name
            "Turbidity" # new source_type name
        ).replace(
            "weather_station_WS700UMB", # old source_type name
            "Weather_station_WS700_UMB" # new source_type name
        )
        destination += f"/{file}"
        
        upload_commands[idx]["dest_path"] = destination
    return upload_commands

fixing naming changes

In [ ]:
upload_commands = _fix_dest_path(upload_commands)

Execute download and upload commands

You can chech the subprocess execution via the stdout and stderr lists.

In [ ]:
d_out, d_err = [], [] # to identy possible problems
for download_command in download_commands:
    out, err = download_file(**download_command)
    d_out.append(out), d_err.append(err)
In [ ]:
u_out, u_err = [], [] # to identy possible problems
for upload_command in upload_commands:
    out, err = upload_file(**upload_command)
    u_out.append(out), u_err.append(err)

List all raw files in backup landing zone

In [ ]:
cmd = 'find {location_backup_landing_zone} -type f -name "*.raw.*"'.format(**linux_user_account_datapool_1)
out, _ = pass_command_datapool_1(cmd)
raw_files = out.split("\n")[:-1]
In [ ]:
def find_raw_for_conversion_scripts(raw_files, upload_commands, point_out_missing_raw_files=False):
    
    _raw_files = {}
    for upload_command in upload_commands:
        source = Path(upload_command["source_path"].replace("-+-", "/"))
        dest = Path(upload_command["dest_path"])
        
        source_type_source = f"{source.parent.parent.name}/{source.parent.name}"
        
        _raw_files[source_type_source] = []
        for file in raw_files:
            if len(_raw_files[source_type_source]) == 2:
                break
            
            if source_type_source in file:
                _raw_files[source_type_source].append(
                    file
                )
    
    if point_out_missing_raw_files:
        for key, val in _raw_files.items():
            if not val:
                print(f"No raw data available for {key}")
    return _raw_files

def remove_broken_files(raw_files):
    return [file for file in raw_files if not "broken" in file]

def _remove_conversion_scripts_where_raw_files_missing(source_type_source_to_raw_map, upload_commands):
    to_delete = []
    for key, val in source_type_source_to_raw_map.items():
        if not val:
            for cmd in upload_commands:
                source_path = cmd["source_path"].replace("-+-", "/")
                if key in source_path:
                    
                    to_delete.append(
                        f"rm {cmd['dest_path']}"
                    )
    return to_delete

def make_download_commands_raw(source_type_source_to_raw_map, dest_path):
    download_commands = []
    for raw_files in source_type_source_to_raw_map.values():
        if raw_files:
            for file in raw_files:
                download_commands.append(
                    {
                        "source_path": file,
                        "dest_path": f"{dest_path}/" + file.replace("/", "-+-")
                    }
                )
    return download_commands

def make_upload_commands_raw(download_commands_raw):
    blz = linux_user_account_datapool_1["location_backup_landing_zone"]
    dlz = linux_user_account_datapool_2["location_development_landing_zone"]
    
    upload_commands = []
    for command in download_commands_raw:
        source = command["source_path"]
        dst = command["dest_path"]
        
        destination = source.replace("-+-", "/").replace(blz, dlz)
        destination = Path(destination).with_suffix("")
        upload_commands.append(
            {
                "source_path": dst,
                "dest_path": destination.as_posix()
            }
        )
    return upload_commands

Removing broken raw files

In [ ]:
raw_files = remove_broken_files(raw_files)

Map sources/source_types to raw files

In [ ]:
source_type_source_to_raw_map = find_raw_for_conversion_scripts(
    raw_files, 
    upload_commands, 
    point_out_missing_raw_files=False
)

Generate raw file commands

a) remove commands for conversion scripts without raw files
b) download commands
c) upload commands
In [ ]:
to_delete_conversion_scripts = _remove_conversion_scripts_where_raw_files_missing(
    source_type_source_to_raw_map, 
    upload_commands
)
In [ ]:
download_commands_raw = make_download_commands_raw(
    source_type_source_to_raw_map, tmp_dir
)
In [ ]:
upload_commands_raw = make_upload_commands_raw(
    download_commands_raw
)

Execute commands

In [ ]:
d_out, d_err = [], [] # to identy possible problems
for download_command in download_commands_raw:
    out, err = download_file(**download_command)
    d_out.append(out), d_err.append(err)
In [ ]:
u_out, u_err = [], [] # to identy possible problems
for upload_command in upload_commands_raw:
    out, err = upload_file(**upload_command)
    u_out.append(out), u_err.append(err)
In [ ]:
d_out, d_err = [], [] # to identy possible problems
for script in to_delete_conversion_scripts:
    out, err = pass_command_datapool_2(script)
    d_out.append(out), d_err.append(err)
In [ ]:
 
In [ ]: