Commits (2)
......@@ -6,3 +6,6 @@ logs/
.idea/
.git/
__pycache__
.DS_Store
.notes
packages/config.py
......@@ -44,7 +44,6 @@ import logging
import pandas
import requests
import multiprocessing
import numpy
from functools import partial
from datetime import datetime
from glob import glob
......@@ -59,7 +58,8 @@ nc2csv_logger = logging.getLogger("nc2csv")
site_packages = "/apps/base/python3.5/lib/python3.5/site-packages"
sys.path.insert(0, site_packages)
try:
from netCDF4 import Dataset
import netCDF4
# from netCDF4 import Dataset
except (ImportError, ModuleNotFoundError) as e:
nc2csv_logger.critical("Error importing netCDF4.\nThe program thought it would be in: {}\n{}".format(site_packages, e))
import argparse
......@@ -161,8 +161,13 @@ def nc2csv(args: argparse.Namespace) -> int :
args.__setattr__('dqr_timeblocks', parse_dqrs(args.datastream, args.variables, args.out_dir, args.DQRfilter))
write_dqr_reference(args.dqr_timeblocks, args.out_dir)
# this is done here to make sure that the date_time variable is in the varlist for indexing and dqr filtering.
args.__setattr__('variables', format_variables(args.variables))
nc2csv_logger.info('Starting process pool.')
process_pool(args)
# For debugging, multiprocessing will mask some errors.
# for file_name in args.files:
# process_one_file(args.variables, args.datastream, args.out_dir, args.DQRfilter, args.dqr_timeblocks, file_name)
nc2csv_logger.info('Dumping headers for {}'.format(args.datastream))
dump_header(args)
......@@ -404,7 +409,7 @@ def process_one_file(variables: list, datastream: str, out_dir: str,
:return:
"""
nc2csv_logger.debug("Begin processing <- {}".format(file_name))
rootgrp = Dataset(file_name, 'r')
rootgrp = netCDF4.Dataset(file_name, 'r')
# if the flag for all variables is set
if "all" in variables:
# populate a temp var_list with all variables
......@@ -413,9 +418,7 @@ def process_one_file(variables: list, datastream: str, out_dir: str,
for key in rootgrp.variables.keys():
if key not in variables:
variables.append(key)
# this is done here to make sure that the date_time variable is in the varlist for indexing and dqr filtering.
variables = format_variables(variables)
variables = format_variables(variables)
# create Pandas Dataframe to hold the columns of new file
df = pandas.DataFrame()
......@@ -425,14 +428,15 @@ def process_one_file(variables: list, datastream: str, out_dir: str,
# create datetime column
if var == "date_time":
temp_datetimes = []
for val in rootgrp["time_offset"][:]:
temp_timestamp = numpy.asscalar(val + rootgrp["base_time"][0])
temp_datetime = datetime.utcfromtimestamp(temp_timestamp).strftime("%Y-%m-%d %H:%M:%S")
temp_datetimes.append(temp_datetime)
df["date_time"] = temp_datetimes
df.set_index("date_time", inplace=True)
# Vector opp to create pandas.__libs.tslibs.timestamps.Timestamp objects out of basetime and time offset
# This is possible without referencing the basetime because it is a parameter of the variable netCDF4.Dataset.variable object.
df["date_time"] = netCDF4.num2date(rootgrp.variables['time_offset'][:],
rootgrp.variables['time_offset'].units)
nc2csv_logger.debug("created date_time column")
# Set the new column as the index so we can do date based slicing for dqr filtering later.
df.set_index("date_time", inplace=True)
nc2csv_logger.debug("Set date_time column as index.")
# remove all variables not in this file's variable keys
# this occurs if a var is selected and it changes name (both names are included in var_list)
......@@ -485,8 +489,8 @@ def process_one_file(variables: list, datastream: str, out_dir: str,
for var in dqr_ranges.keys():
if var in df.keys():
for record in dqr_ranges[var]:
dqr_start = str(datetime.strptime(record[0], '%Y%m%d.%H%M%S'))
dqr_end = str(datetime.strptime(record[1], '%Y%m%d.%H%M%S'))
dqr_start = datetime.strptime(record[0], '%Y%m%d.%H%M%S')
dqr_end = datetime.strptime(record[1], '%Y%m%d.%H%M%S')
if dqr_start > df_end or dqr_end < df_start:
pass
else:
......@@ -502,14 +506,9 @@ def process_one_file(variables: list, datastream: str, out_dir: str,
df = DQRfiltering(df, dqr_ranges, missing_num)
nc2csv_logger.debug("Finished dqr filtering for {}".format(file_name))
# replace elements of the start time datetime object to get in new format for file name
start_time = df.first_valid_index()
nc2csv_logger.debug('first_valid_index = {}'.format(start_time))
to_replace = [('-', ''), (':', ''), (' ', '.')]
for item in to_replace:
start_time = start_time.replace(item[0], item[1])
nc2csv_logger.debug("converted first_valid_index = {}".format(start_time))
# Get the first valid index, which is a Timestamp object, and format it.
start_time = df.first_valid_index().__format__("%Y%m%d.%H%M%S")
nc2csv_logger.debug('first_valid_index = {}({})'.format(type(start_time), start_time))
# get the first 4 parts of the first file and join them on a period and add new ending
outbase = "{}.{}.custom.csv".format(datastream, start_time)
......@@ -533,7 +532,7 @@ def dump_header(args: argparse.Namespace) -> None:
"""
for i, f in enumerate(args.files):
if os.path.isfile(f):
ds = Dataset(f)
ds = netCDF4.Dataset(f)
head_attrs = ds.ncattrs()
if i == 0:
base_name = ".".join(os.path.basename(f).split(".")[:4])
......@@ -598,18 +597,21 @@ def merge_output(args: argparse.Namespace):
for f in tmpfiles[:new_start_index]:
nc2csv_logger.debug('Merging {}'.format(f))
dataframes = csv_2_df_list(tmpfiles[:new_start_index])
concat_frames = pandas.concat(dataframes)
concat_frames = pandas.concat(dataframes, sort=True)
outfile_path = create_output_path(args.out_dir, tmpfiles[:new_start_index])
nc2csv_logger.info("Output file --> {}".format(outfile_path))
concat_frames.sort_values("date_time", inplace=True, ascending=True)
# concat_frames.sort_values("date_time", inplace=True, ascending=True)
concat_frames.to_csv(outfile_path, index=False, encoding="ascii")
nc2csv_logger.debug("Wrote output file.")
if os.path.isfile(outfile_path):
# if output sucsessful then delete the individual csv files
nc2csv_logger.debug("Removing merged files.")
remove_files(tmpfiles[:new_start_index])
if len(tmpfiles) > new_start_index:
nc2csv_logger.debug("Recursively calling merge again; [{},{}]".format(new_start_index, len(tmpfiles)))
merge(tmpfiles[new_start_index:])
merge(tmpfiles)
......