nc2csv.py 33.2 KB
Newer Older
1 2 3 4 5
#!/apps/base/python3/bin/python3

"""
Author: Michael Giansiracusa
Email: giansiracumt@ornl.gov
6
Version: 1.1.0
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43

Purpose:
    This module processes NetCDF files into ascii csf format.

Arguments
    required arguments
        in_dir : str
            fully qualified path to files that will be processed.
        out_dir : str
            fully qualified path where to put output files.
        var_list : str
            fully qualified path to a file that contains all
            variables to extract each on it's own line.
        file_list : str
            fully qualified path to a file that contains all
            files in the in_dir to process.
    optional arguments
        merged_output : bool flag
            output files merged into one if provided
        replaceMissing : bool flag
            replace missing values (-9999, -9995) with ""/None
        DQRfilter : str
            filter data by variable using dqr status.
            should be suspect, incorrect, or suspect,incorrect


Output:
    One cdf file for each file in input directory is put in the out_dir or
    one merged file with all rows and columns converted from input cdf files.
    The first row of each file will be the column names; the first 3 column
    names will always be time_stamp, basetime, and time_offset. The time_stamp
    column is calculated by adding the basetime and time_offset and converting
    to a calendar date of the form YYYY-mm-dd HH:MM:SS
"""

import os
import sys
44 45 46 47 48
import logging
import pandas
import requests
import multiprocessing
from functools import partial
49
from datetime import datetime
50 51 52 53
from glob import glob
from logging.handlers import RotatingFileHandler

# The following would override the logging configuration in netcdf2ascii, uncomment to test just this script.
54 55 56
logging.basicConfig(level=logging.DEBUG,
                    format='%(asctime)s %(levelname)s (%(funcName)s %(lineno)s) : %(message)s',
                    datefmt='%Y-%m-%d %H:%M')
57 58 59 60
nc2csv_logger = logging.getLogger("nc2csv")
# This was needed to import the netCDF4 library.
site_packages = "/apps/base/python3.5/lib/python3.5/site-packages"
sys.path.insert(0, site_packages)
61
try:
62 63
    import netCDF4
    # from netCDF4 import Dataset
64 65 66
except (ImportError, ModuleNotFoundError) as e:
    nc2csv_logger.critical("Error importing netCDF4.\nThe program thought it would be in: {}\n{}".format(site_packages, e))
import argparse
67

68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100

help_description = """
This program converts netcdf files into csv files.
"""

example = """
EXAMPLE: py3 netcdf2ascii.py
            --in_dir "/path_to/input"
            --out_dir "/path_to/output"
            --var_list "/path_to/var_list.txt"
            --file_list "/path_to/file_list.txt"
"""

""" setup_arguments summary

Author: Michael Giansiracusa
Email: giansiracumt@ornl.gov

Purpose:
    Parse arguments from command line.

Args:
    None

Returns:
    args object with attributes
            in_dir : str
            out_dir : str
            var_list : str
            file_list : str
"""

def parse_arguments():
101 102 103 104
    """Instantiates an argument parser and returns loads in artument flags.

    :return (argparse.Namespace): Ojbect with arguments and flags as dot properties.
    """
105 106 107 108
    parser = argparse.ArgumentParser(description=help_description, epilog=example,
                                     formatter_class=argparse.ArgumentDefaultsHelpFormatter)

    requiredArguments = parser.add_argument_group("required arguments")
109
    optionalArguments = parser.add_argument_group("optional arguments")
110

111 112 113 114
    # The type is a function to validate that the path exists, only for the required arguments.
    requiredArguments.add_argument("-i", "--indir", type=valid_argument, dest="in_dir", required=True,
                                   help="The directory where input files are.")
    requiredArguments.add_argument("-o", "--outdir", type=valid_argument, dest="out_dir", required=True,
115
                                   help="The directory to put output file in.")
116
    requiredArguments.add_argument("-f", "--filelist", type=valid_argument, dest="file_list", required=True,
117
                                   help="The name of a file that contains the names of files to convert.")
118 119
    requiredArguments.add_argument("-v", "--varlist", type=valid_argument, dest="var_list", required=True,
                                   help="The name of a file that contains the names of variables to extract.")
120

121
    optionalArguments.add_argument("--mergedoutput", action="store_true", dest="merged_output",
122
                        help="merge files into one, possibly with max size")
123 124
    # DQR filter can take the following forms; --DQRfilter nofilter,incorrect,suspect
    optionalArguments.add_argument("--DQRfilter", type=str, dest="DQRfilter", default="",
125
                        help="filter out data with dqr, <suspect> or <incorrect> or <suspect,incorrect>")
126
    optionalArguments.add_argument("-mp", type=int, dest="maxProcesses", default=3,
127
                        help="number of processes in multiprocessing pool")
128 129 130 131 132

    args = parser.parse_args()
    return args


133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163
def nc2csv(args: argparse.Namespace) -> int :
    """Main workflow story

    :param args (argparse.Namespace): Object with arguments and flags as dot properties
    :return (int): exit status
    """
    nc2csv_logger.debug('args: {}'.format(args))
    args.__setattr__('out_dir', create_output_dir(args.out_dir))

    # configure rotating file handler for extraction report
    log_file = os.path.join(args.out_dir, 'conversion_report.txt')
    my_handler = RotatingFileHandler(log_file, mode='a', maxBytes=1024 * 1024, backupCount=1)
    my_handler.setLevel(logging.WARNING)
    nc2csv_logger.addHandler(my_handler)

    nc2csv_logger.info('Parsing variable list: {}'.format(args.var_list))
    args.__setattr__('variables', parse_varlist(args.var_list)) # also validates var_list
    nc2csv_logger.debug('variables: {} - {}'.format(type(args.variables), args.variables))

    nc2csv_logger.info('Loading file list: {}'.format(args.file_list))
    args.__setattr__('files', load_file_list(args.file_list, args.in_dir))
    nc2csv_logger.debug('files: {}'.format(args.files))

    nc2csv_logger.info('Parsing datastream.'.format())
    args.__setattr__('datastream', parse_datastream(args.files))
    nc2csv_logger.debug('Datastream = {}'.format(args.datastream))

    nc2csv_logger.info('Getting dqr timeblocks.')
    args.__setattr__('dqr_timeblocks', parse_dqrs(args.datastream, args.variables, args.out_dir, args.DQRfilter))
    write_dqr_reference(args.dqr_timeblocks, args.out_dir)

164 165
    # this is done here to make sure that the date_time variable is in the varlist for indexing and dqr filtering.
    args.__setattr__('variables', format_variables(args.variables))
166
    nc2csv_logger.info('Starting process pool.')
167
    process_pool(args)
168
    # For debugging, multiprocessing will mask some errors.
169 170
    # for file_name in args.files:
    #     process_one_file(args.variables, args.datastream, args.out_dir, args.DQRfilter, args.dqr_timeblocks, file_name)
171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206

    nc2csv_logger.info('Dumping headers for {}'.format(args.datastream))
    dump_header(args)

    if args.merged_output:
        nc2csv_logger.info('Merging output.')
        merge_output(args)

    return(0)

def valid_argument(arg: str) -> str:
    """Checks to make sure the input argument exists. If not raise argparse.ArgumentTypeError.

    :param arg (str): One of the required input arguments.
    :return str: The input argument path as a string if it exists.
    """
    if os.path.exists(arg):
        return arg
    else:
        raise argparse.ArgumentTypeError("Input argument does not exist: {}".format(arg))

def parse_varlist(var_list: str) -> list:
    """Parse file with a list of variables into a tuple which is used later to extract them from the netcdf files.

    :param var_list (str): The name of the file containing a list of variables to extract.
    :return (list): A list containing variable names as strings.
    """

    def valid_varlist(var_list: str) -> str:
        """Special method to handle issue when varlist isn't created or is empty.
        This can be caused by a failed database insert from Data Discovery.
        Talk to Kyle Dumas or Joseph Olat to fix this issue.

        :param arg (str): The var_list input argument.
        :return str: The path to the file containing the variables to extract or 'all' to denote extracting all variables
        """
207
        if var_list and os.path.exists(var_list):
208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395
            if os.stat(var_list).st_size != 0:
                return var_list
            else:
                nc2csv_logger.warning("{} is empty {}".format(var_list, os.stat(var_list).st_size))
                return ''
        else:
            nc2csv_logger.warning("{} DNE".format(var_list))
            return ''

    variables = list()
    if valid_varlist(var_list):
        with open(var_list) as open_var_list:
            for line in open_var_list:
                # for line in file, remove new line break
                variables.append(line.replace("\n", ""))
        return variables
    else:
        # Return a tuple to match typehint with all flag for extrating every variable from every file.
        return ['all']

def create_output_dir(out_dir: str) -> str:
    """This will create the ascii-csv output directory inside the ADRS output directory which contains netcdf files.

    :param out_dir (str): The output directory for ADRS which is managed by Joseph Olat.
    :return None: New output directory
    """
    out_dir = os.path.join(out_dir, 'ascii-csv')
    os.makedirs(out_dir, exist_ok=True)
    return out_dir

def load_file_list(file_list: str, in_dir: str) -> tuple:
    """Read the list of files from input arguments and return tuple.

    :param file_list (str): Path to file containing list of filenames to be worked on.
    :return tuple: Tuple of file names to as an attribute of the argparse.Namespace.
    """
    files = []
    with open(file_list) as open_file:
        for file_name in open_file:
            file_path = os.path.join(in_dir, file_name.strip("\n"))
            if os.path.isfile(file_path):
                files.append(file_path)
    # Sorting done for easy access to begin and end date through file parsing
    files.sort()
    return tuple(files)

def parse_datastream(files: tuple) -> str:
    """Parse the datastream from the file names

    :param args (argparse.Namespace):  Main program input arguments object
    :return None: Instead of output the
    """
    for f in files:
        if os.path.isfile(f):
            datastream = ".".join((f.split('/')[-1].split('.')[:2]))
            return datastream
    nc2csv_logger.warning("ERROR: could not parse datastream from files.")
    return 'default_ds'

def parse_dqrs(datastream: str, variables: str, out_dir: str, DQRfilter: str) -> dict:
    """Make a request of DQR web service api and format a tuple of dqr information.
    The resulting data structure will be a dictionary where each variable is a key and
    the value will be a tuple containing tuples which have, start datetime, end datetime,
    dqr number, dqr level (incorrect/suspect), datastream info, and comment.
    Return = dict('var1': ((str(YYYYmmdd.HHMMSS), str(YYYYmmdd.HHMMSS), str(dqr), str(ds info), str(comment)))

    :param datastream (str): The name of the datastream; e.g. 'sgpmetE13.b1'
    :param variables (tuple): Variable names as strings.
    :param out_dir (str): Fully qualified path to modified output directory.
    :param DQRfilter (str): DQR level to filter out, incorrect, suspect, or incorrect,suspect
    :return (dict): Dictionary where key=varname, value=tuple of tuples containing dqr info.
    """

    # Helper method to parse each query the web service into a tuple of tuples.
    def get_timeblocks(url: str) -> tuple:
        """This will make a api cal and format the data into a tuple of tuples

        :param url (str): http url to make request api call
        :return (tuple): tuple of tuples containing all data quality reports
        """
        # query web service https://www.archive.arm.gov/dqrws/
        response = requests.get(url)
        timeblocks = list()
        # if response was valid
        if response.status_code == 200:
            # decode and split content into lines, each line is a dqr record
            for line in response.content.decode('utf-8').split('\n'):
                nc2csv_logger.debug('line: {}'.format(line))
                # parse line and append an array to timeblocks array
                if line:
                    timeblocks.append(tuple(line.split('|')))
        # if bad response from web server
        elif response.status_code == 500:
            raise ConnectionError

        return tuple(timeblocks)

    if DQRfilter and 'suspect' not in DQRfilter and 'incorrect' not in DQRfilter:
        nc2csv_logger.warning("DQR filter parameter not recognized. No DQR filtering.")
        return dict()

    # create result dictionary to hold dqr ranged by key = variable name
    dqr_results = dict()
    # url for dqr web service with query features added.
    url = "".join(("https://adc.arm.gov/dqrws/ARMDQR?datastream=", datastream,
                       "&responsetype=delimited&searchmetric=", DQRfilter,
                       "&timeformat=YYYYMMDD.hhmmss&dqrfields=starttime,endtime,dqrid,metric,subject"))

    dqr_results['all'] = False
    for var in variables:
        var_url = "".join((url, "&varname=", var))
        nc2csv_logger.debug("getting dqr timeblocks for {}\n\t{}".format(var, var_url))
        try:
            timeblocks = get_timeblocks(var_url)
        except ConnectionError:
            # if dqrws is down then mark for all variables and break out of loop
            nc2csv_logger.error("Error with dqr web service. Defaulting to no filtering.")
            dqr_results['all'] = True
            break
        dqr_results[var] = timeblocks

    nc2csv_logger.debug("dqr_results: {}".format(dqr_results))
    return dqr_results

def write_dqr_reference(dqr_timeblocks: dict, out_dir: str) -> None:
    """Write a dqr report based on the results of the dqr web service api call.
    This is for one datastream only and has variable level resolution. For each
    variable there could 0 or more dqrs for unique time ranges.

    :param dqr_timeblocks (dict): Dictionary where each key is a variable and the value is a tuple of tuples for each dqr record.
    :param out_dir (str): The fully qualifed path to the output directory.
    :return (None): Writes a file, no return value.
    """
    # write the dqr results to a file for reference
    dqr_output = os.path.join(out_dir, 'dqr_results.txt')
    with open(dqr_output, 'w') as of:
        for key in dqr_timeblocks.keys():
            if dqr_timeblocks[key] and key != 'all':
                for dqr_record in dqr_timeblocks[key]:
                    nc2csv_logger.debug("{} - {}".format(key, dqr_record))
                    of.write("{} - {}\n".format(key, dqr_record))
            if key != 'all':
                of.write("No dqr for {}.\n".format(key))
                nc2csv_logger.debug("No dqr for {}.".format(key))
    nc2csv_logger.debug(" -- finished writing dqr_results.")

def format_variables(variables: list) -> list:
    """Format the variable list so the time vars are at the beginning and add datetime variable
    which will be calculated and added to the file

    :param variables (list): Variables as strings to reorganize.
    :return (list): Variables with time ones at the beginning.
    """
    # add date_time to variables, this is where we will add the calculated calendar date_time
    variables.insert(0, "date_time")
    # add base_time column to index 1, after trying to remove that variable if it was already selected
    try:
        variables.remove("base_time")
    except Exception:
        pass
    finally:
        variables.insert(1, "base_time")
    # add time_offset column to index 2, after trying to remove that variable if it was already selected
    try:
        variables.remove("time_offset")
    except Exception:
        pass
    finally:
        variables.insert(2, "time_offset")
    return variables

def process_pool(args: argparse.Namespace) -> None:
    """This method starts a pool of processes, preps a partial function with all static arguments
    and then maps each file name to the pool of processes to do the work.

    :param args (argparse.Namespace): Object with input arguments and flags as dot properties.
    :return (None): No return.
    """
    try:
        pool = multiprocessing.Pool(processes=args.maxProcesses)
        partial_function = partial(process_one_file, args.variables, args.datastream,
                                   args.out_dir, args.DQRfilter, args.dqr_timeblocks)
        pool.map(partial_function, args.files)
    except Exception as e:
        nc2csv_logger.warning("Multiprocessing exception: {}".format(e))
    finally:
        pool.close()
        pool.join()
396

397 398 399 400 401 402 403 404 405 406 407 408 409 410 411
def process_one_file(variables: list, datastream: str, out_dir: str,
                     DQRfilter: str, dqr_ranges: dict, file_name: str) -> None:
    """Conversion algorithm from cdf file to ascii csv
    Load file using netCDF4, build pandas dataframe to store extracted variables (no 2+ dimension vars),
    filter data and replace with missing value attribute based on data quality reports (dqr), save to csv.

    :param variables (list): A list of variable names whose data will be extracted.
    :param datastream (str): Datastream name for saving output file.
    :param out_dir (str): Modified output directory for file output.
    :param DQRfilter (str): String representing levels of dqr filtering.
    :param dqr_ranges (dict): Dict (keys=var names) containing a tuple of tuples with dqr info for each variable.
    :param file_name (str): File being worked on by current instantiation of this process.
    :return:
    """
    nc2csv_logger.debug("Begin processing <- {}".format(file_name))
412
    rootgrp = netCDF4.Dataset(file_name, 'r')
413 414 415 416 417 418 419 420
    # if the flag for all variables is set
    if "all" in variables:
        # populate a temp var_list with all variables
        nc2csv_logger.debug("populating list will all variables in current netCDF file")
        variables = list()
        for key in rootgrp.variables.keys():
            if key not in variables:
                variables.append(key)
421
        variables = format_variables(variables)
422 423 424 425 426 427 428 429 430

    # create Pandas Dataframe to hold the columns of new file
    df = pandas.DataFrame()
    nc2csv_logger.debug("created empty dataframe")
    for var in variables:
        dimension = None

        # create datetime column
        if var == "date_time":
431 432 433 434 435 436 437 438 439
            # FIXME This is the old way, much slower. the num2date can throw an index out of bounds.
            # temp_datetimes = []
            #     for val in rootgrp["time_offset"][:]:
            #         temp_timestamp = np.asscalar(val + rootgrp["base_time"][0])
            #         temp_datetime = datetime.utcfromtimestamp(temp_timestamp).strftime("%Y-%m-%d %H:%M:%S")
            #         temp_datetimes.append(temp_datetime)
            #     df["date_time"] = temp_datetimes
            #     df.set_index("date_time", inplace=True)
            #     print("process_one_file: created date_time column")
440

441 442
            # Vector opp to create pandas.__libs.tslibs.timestamps.Timestamp objects out of basetime and time offset
            # This is possible without referencing the basetime because it is a parameter of the variable netCDF4.Dataset.variable object.
443 444 445 446 447 448 449
            try:
                df["date_time"] = netCDF4.num2date(rootgrp.variables['time_offset'][:],
                                               rootgrp.variables['time_offset'].units,
                                               calendar='standard')
            except OverflowError as ofe:
                nc2csv_logger.warning("{}: {} skipped.".format(ofe, file_name))
                return None
450
            nc2csv_logger.debug("created date_time column")
451 452 453 454
            # Set the new column as the index so we can do date based slicing for dqr filtering later.
            df.set_index("date_time", inplace=True)
            nc2csv_logger.debug("Set date_time column as index.")

455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476

            # remove all variables not in this file's variable keys
        # this occurs if a var is selected and it changes name (both names are included in var_list)
        elif var not in rootgrp.variables.keys():
            msg = "WARNING {} not found in {}".format(var, os.path.basename(file_name))
            nc2csv_logger.warning(msg)
        else:
            dim_len, dim = len(rootgrp.variables[var].dimensions), rootgrp.variables[var].dimensions

            # if the length of dimensions is 0 then it is a constant
            if dim_len == 0:
                # create list of correct length and write the list to
                # pandas.Dataframe with column name of current var
                nc2csv_logger.debug("{} is constant".format(var))
                df[var] = [rootgrp.variables[var][0]] * len(rootgrp.variables["time_offset"])
            # if the len of dimension is 1 then it's a time series
            elif dim_len == 1 and dim[0] == "time":
                # write the list to pandas.Dataframe with column name of current var
                nc2csv_logger.debug("{} is 1 dimension".format(var))
                df[var] = rootgrp.variables[var][:]
            elif dim_len == 2 and dim[0] == "time":
                # if length of the dimensions is 2 then it's a 2d var with time
477
                msg = "{} is 2 dimensional and will not be converted. Dims: {}".format(var, dim)
478
                nc2csv_logger.warning(msg)
479 480 481 482 483 484 485 486 487 488
                # This would convert 2 dimensional variables into multiple columns but it was agreed we should not support this feature.
                # This program needs to work on python version 3.5.0 which does not support multi-dimensional list comprehensions in ln 470.
                # try:
                #     for dim_number in range(len(rootgrp.variables['concentration'].dimensions[1])):
                #         var_heading = "{}_{}".format(var, dim_number)
                #         nc2csv_logger.debug("var_heading: {}".format(var_heading))
                #         data = [d[dim_number] for d in rootgrp.variables[var][:].data]
                #         df[var_heading] = data
                # except IndexError:
                #     pass
489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512
            elif dim_len > 2:
                # if length of dimensions > 2 then... I've not seen this yet - May 4th 2017
                msg = "WARNING: {} in {} > 2 dimensional and will not be converted. Dims: {}".format(var, os.path.basename(file_name), dim)
                nc2csv_logger.warning(msg)
    nc2csv_logger.debug("Finished variable extraction for {}".format(file_name))

    # pass pandas.DataFrame, dict of dqr ranges and netCDF4.Dataset
    if DQRfilter:
        nc2csv_logger.debug("Filtering {} for dqr status {}".format(file_name, DQRfilter))

        def DQRfiltering(df: pandas.DataFrame, dqr_ranges: dict, missing_num: int) -> pandas.DataFrame:
            """This method uses slice and replace to substitute the missing value attribute in place of any data
            with a data quality report associated with it. It does this for each variable independently.

            :param df (pandas.DataFrame): The DataFrame of the current file's partially converted data.
            :param dqr_ranges (dict): A dictionary (key=var) containing tuples with variable level dqr data.
            :param missing_num (int): Value to mask filtered values with.
            :return (pandas.DataFrame): DataFrame with variable level values replaced with missing value attribute.
            """
            df_start = df.first_valid_index()
            df_end = df.last_valid_index()
            for var in dqr_ranges.keys():
                if var in df.keys():
                    for record in dqr_ranges[var]:
513 514
                        dqr_start = datetime.strptime(record[0], '%Y%m%d.%H%M%S')
                        dqr_end = datetime.strptime(record[1], '%Y%m%d.%H%M%S')
515 516 517 518 519 520
                        if dqr_start > df_end or dqr_end < df_start:
                            pass
                        else:
                            nc2csv_logger.debug("Filtering {}:{} from {} to {}".format(file_name, var, dqr_start, dqr_end))
                            df.loc[dqr_start:dqr_end, var] = missing_num
            return df
521

522 523 524 525 526 527 528 529
        try:
            missing_num = rootgrp.getncattr('missing-data')
        except AttributeError:
            missing_num = -9999
        # then filter by dqr ranges by variable
        df = DQRfiltering(df, dqr_ranges, missing_num)
        nc2csv_logger.debug("Finished dqr filtering for {}".format(file_name))

530 531 532
    # Get the first valid index, which is a Timestamp object, and format it.
    start_time = df.first_valid_index().__format__("%Y%m%d.%H%M%S")
    nc2csv_logger.debug('first_valid_index = {}({})'.format(type(start_time), start_time))
533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555

    # get the first 4 parts of the first file and join them on a period and add new ending
    outbase = "{}.{}.custom.csv".format(datastream, start_time)
    # make fully qualified path to outfile using out_dir and base file name
    output_file = os.path.join(out_dir, outbase)
    nc2csv_logger.debug("full output_file = {}".format(output_file))

    # write pandas dataframe to file
    nc2csv_logger.debug("writing result to output file ... ")
    df.to_csv(output_file, index=True, encoding="ascii")
    nc2csv_logger.debug("Done processing -> {}".format(output_file))

def dump_header(args: argparse.Namespace) -> None:
    """Scan through global variables and dump header of the file each time the keys change.
    This doens't dump the header each time the values change, that would be every file.
    Just every time the keys change, either there is a new one, one is taken away or
    the name of an existing key changes.

    :param args (argparse.Namespace): Object with input arguments and added global variables as dot peoporties.
    :return (None): No return value, just writes an output file.
    """
    for i, f in enumerate(args.files):
        if os.path.isfile(f):
556
            ds = netCDF4.Dataset(f)
557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605
            head_attrs = ds.ncattrs()
            if i == 0:
                base_name = ".".join(os.path.basename(f).split(".")[:4])
                output_header_name = "{}.header.txt".format(base_name)
                output_header_path = os.path.join(args.out_dir, output_header_name)
                cmd = "ncdump -h {} > {}".format(f, output_header_path)
                nc2csv_logger.debug(cmd)
                os.system(cmd)
                nc2csv_logger.debug("first header - cmd = {}".format(cmd))
            else:
                if head_attrs != ds.ncattrs():
                    nc2csv_logger.debug('writing new header file.')
                    head_attrs = ds.ncattrs()
                    base_name = ".".join(os.path.basename(f).split(".")[:4])
                    output_header_name = "{}.header.txt".format(base_name)
                    output_header_path = os.path.join(args.out_dir, output_header_name)
                    cmd = "ncdump -h {} > {}".format(f, output_header_path)
                    nc2csv_logger.debug(cmd)
                    os.system(cmd)
                    nc2csv_logger.debug("additional header - cmd = {}".format(cmd))
                else:
                    head_attrs = ds.ncattrs()
    nc2csv_logger.debug("done dumping headers")

def merge_output(args: argparse.Namespace):
    """Attempt to merge converted files (ascii csv) into one file. Max file size is ~2.15 Gb.

    :param args (argparse.Namespace): Object with input arguments and added global variables as dot peoporties.
    :return:
    """

    # Search for the converted files and for merging, they should all end with custom.csv.
    file_search_exp = "*custom.csv".format(args.datastream)
    search_exp = os.path.join(args.out_dir, file_search_exp)
    nc2csv_logger.debug("Search expression: {}".format(search_exp))
    tmpfiles = file_search(search_exp)

    def merge(tmpfiles: list) -> None:
        """RECURSIVE METHOD: This is separated out as a closure method so that it can be called recursively to enforce
        the max file size specified below. When the max size is reached the list is effectively sliced to remove the
        files that have already been merged and the method is called again.

        :param tmpfiles (list):  List of converted files to merge.
        :return (None): Outputs a merged file but no return value.
        """
        nc2csv_logger.debug('number of starting files: {}'.format(len(tmpfiles)))
        total_size = 0
        max_size = 2147483648 # ~2.15 Gb
        nc2csv_logger.debug('looping through tmp files: {}'.format(tmpfiles))
606
        new_start_index = len(tmpfiles)-1
607 608 609 610 611 612 613 614 615 616 617 618 619 620 621
        for i, f in enumerate(tmpfiles):
            if os.path.getsize(f) > max_size:
                nc2csv_logger.warning('No merging possible. Files to large.')

            total_size += os.path.getsize(f)
            if total_size > max_size:
                new_start_index = i
                nc2csv_logger.debug('Current total size {} next start index {}'.format(total_size, i))
                break
            else:
                new_start_index = len(tmpfiles)
        nc2csv_logger.debug('*** merging files ***')
        for f in tmpfiles[:new_start_index]:
            nc2csv_logger.debug('Merging {}'.format(f))
        dataframes = csv_2_df_list(tmpfiles[:new_start_index])
622 623 624 625
        concat_frames = pandas.concat(dataframes)
        # This program needs to work on python version 3.5.0 in
        # which the compatible version of pandas does not support the "sort" keyword
        # concat_frames = pandas.concat(dataframes, sort=True)
626 627 628 629

        outfile_path = create_output_path(args.out_dir, tmpfiles[:new_start_index])
        nc2csv_logger.info("Output file --> {}".format(outfile_path))

630
        # concat_frames.sort_values("date_time", inplace=True, ascending=True)
631
        concat_frames.to_csv(outfile_path, index=False, encoding="ascii")
632
        nc2csv_logger.debug("Wrote output file.")
633

634 635
        if os.path.isfile(outfile_path):
            # if output sucsessful then delete the individual csv files
636
            nc2csv_logger.debug("Removing merged files.")
637 638
            remove_files(tmpfiles[:new_start_index])
        if len(tmpfiles) > new_start_index:
639
            nc2csv_logger.debug("Recursively calling merge again; [{},{}]".format(new_start_index, len(tmpfiles)))
640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708
            merge(tmpfiles[new_start_index:])

    merge(tmpfiles)
    nc2csv_logger.debug("Done merging output")

def file_search(search_exp: str) -> list:
    """This is used to get the list of files that were created by converting the cdf/nc input files.
    These files will attempt to be merged into fewer, larger files then these will be deleted.

    :param search_exp (str): Expression used to search for files to merge, should be something like '*.custom.csv'.
    :return (list): List of csv files to try and merge.
    """
    try:
        tmpfiles = glob(os.path.join(search_exp))
        nc2csv_logger.debug("Tmpfiles aquired.")
    except NameError as ne:
        nc2csv_logger.warning("Could not get files list, undefined variables. {}".format(ne))
    else:
        if len(tmpfiles) == 0:
            nc2csv_logger.warning("Empty file list for merging.")
        return sorted(tmpfiles)

def csv_2_df_list(files: list) -> list:
    """Read csv files as pandas DataFrames add to a list.

    :param files (list): List of strings which are fully qualified paths to csv files.
    :return (list): List of pandas DataFrames.
    """
    dataframes = list()
    nc2csv_logger.debug("Creating list of dataframes")
    for f in files:
        dataframes.append(pandas.read_csv(f))
    return dataframes

def create_output_path(out_dir: str, files: list) -> str:
    """Add start and end datetime to merged filename.

    :param out_dir (str): Output path for merged files.
    :param files (list): List of files that are going to be merged. Could be a slice of the full list due to max size restriction.
    :return (str): Merged file name with new start and end datetime in file name.
    """
    files.sort()
    first_part = '{}-{}'.format('.'.join(files[0].split('/')[-1].split(".")[0:4]), '.'.join(files[-1].split(".")[2:4]))
    # out_array = files[0].split('/')[-1].split(".")[0:4] # end at 3 if including end date
    # out_array.append('.'.join(files[-1].split(".")[2:4]))  # this is the date in file YYYYMMDD
    # out_aray.append(tmpfiles[-1].split(".")[4]) # this is the time on file hhmmss
    # print("PostProc.create_output_path: building output file from {}".format(out_array))

    # join them on a period and add new ending
    outfile_name = "{}.custom.merged.csv".format(first_part)

    # make fully qualified path to outfile using out_dir and base file name
    output_path = os.path.join(out_dir, outfile_name)

    return output_path

def remove_files(files: list):
    """Remove files that have been merged.

    :param files (list): This could be a slice of the full list because they are recursively merged and deleted.
    :return None: No return, just deletes files. Logs to conversion_report.txt file in output directory if fnf error.
    """
    nc2csv_logger.debug("Removing files.")
    for f in files:
        try:
            os.remove(f)
        except FileNotFoundError as fnfe:
            nc2csv_logger.warning('Could not remove file: {} : {}'.format(f, fnfe))
    nc2csv_logger.debug("Done removing files")
Michael Giansiracusa's avatar
Michael Giansiracusa committed
709

710
if __name__ == "__main__":
711 712 713 714 715 716 717
    try:
        args = parse_arguments()
        nc2csv(args)
    except Exception:
        raise Exception
    finally:
        nc2csv_logger.info("*"*80)