feat: final push for submission
This commit is contained in:
104
convert.py
104
convert.py
@@ -1,60 +1,78 @@
|
||||
import os
|
||||
import yaml
|
||||
import logging
|
||||
|
||||
import awkward as ak
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
import tqdm
|
||||
|
||||
scorecard_dir = "data/scorecard"
|
||||
scorecard_dir = os.path.join(scorecard_dir, os.listdir(scorecard_dir)[0])
|
||||
import utils
|
||||
|
||||
print("Loading data.yaml")
|
||||
with open(os.path.join(scorecard_dir, 'data.yaml'), 'r') as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
print("Loading CSVs to dataframes")
|
||||
files = [f'MERGED{i}_{(i + 1) % 100:02}_PP.csv' for i in tqdm.trange(1996, 2024)]
|
||||
dataframes = [pd.read_csv(os.path.join(scorecard_dir, file)) for file in tqdm.tqdm(files)]
|
||||
|
||||
print("Appending extra rows where needed")
|
||||
unit_ids = np.unique(np.hstack([frame.UNITID.to_numpy() for frame in tqdm.tqdm(dataframes)]))
|
||||
for i, frame in tqdm.tqdm(enumerate(dataframes)):
|
||||
new_rows = pd.DataFrame({"UNITID": unit_ids[~np.isin(unit_ids, frame.UNITID)]})
|
||||
dataframes[i] = pd.concat([frame, new_rows]).sort_values(by=["UNITID", "OPEID"])
|
||||
|
||||
print("Converting to Results Array")
|
||||
result = {}
|
||||
for key, sec in tqdm.tqdm(data['dictionary'].items()):
|
||||
if 'calculate' in sec:
|
||||
continue
|
||||
|
||||
data_key = sec['source']
|
||||
if data_key not in dataframes[0]:
|
||||
continue
|
||||
if __name__ == "__main__":
|
||||
# Setup Args
|
||||
parser = utils.get_common_args(prog="CSV -> Pandas Scorecard Converter")
|
||||
args = parser.parse_args()
|
||||
|
||||
parts = key.split('.')
|
||||
section = result
|
||||
for i in range(len(parts) - 1):
|
||||
part = parts[i]
|
||||
if part not in section:
|
||||
section[part] = {}
|
||||
section = section[part]
|
||||
# Setup Logging
|
||||
utils.setup_logging(args.debug)
|
||||
logger = logging.getLogger("CSVPandasConverter")
|
||||
|
||||
obj = np.vstack([frame[data_key] for frame in dataframes]).T
|
||||
for frame in dataframes:
|
||||
del frame[data_key] # Memory cleanup
|
||||
scorecard_dir = os.path.join(args.data_dir, "scorecard")
|
||||
scorecard_dir = os.path.join(scorecard_dir, os.listdir(scorecard_dir)[0])
|
||||
logger.info(f"Loading College Scorecard data from directory {scorecard_dir}")
|
||||
|
||||
logger.debug("Loading metadata from data.yaml")
|
||||
with open(os.path.join(scorecard_dir, 'data.yaml'), 'r') as file:
|
||||
data = yaml.safe_load(file)
|
||||
|
||||
logger.info("Loading all CSV files as Pandas dataframes")
|
||||
files = [f'MERGED{i}_{(i + 1) % 100:02}_PP.csv' for i in tqdm.trange(1996, 2024)]
|
||||
dataframes = [pd.read_csv(os.path.join(scorecard_dir, file)) for file in tqdm.tqdm(files)]
|
||||
|
||||
logger.info("Creating list of all UNITIDs across all files")
|
||||
unit_ids = np.unique(np.hstack([frame.UNITID.to_numpy() for frame in tqdm.tqdm(dataframes)]))
|
||||
|
||||
logger.info("Appending extra columns to each year's dataframes to prepare for appending")
|
||||
for i, frame in tqdm.tqdm(enumerate(dataframes)):
|
||||
new_rows = pd.DataFrame({"UNITID": unit_ids[~np.isin(unit_ids, frame.UNITID)]})
|
||||
dataframes[i] = pd.concat([frame, new_rows]).sort_values(by=["UNITID", "OPEID"])
|
||||
|
||||
logger.info("Converting to Results Array")
|
||||
result = {}
|
||||
for key, sec in tqdm.tqdm(data['dictionary'].items()):
|
||||
if 'calculate' in sec:
|
||||
continue
|
||||
|
||||
data_key = sec['source']
|
||||
if data_key not in dataframes[0]:
|
||||
continue
|
||||
|
||||
if obj.dtype == object:
|
||||
obj = obj.astype(str)
|
||||
section[parts[-1]] = obj
|
||||
parts = key.split('.')
|
||||
section = result
|
||||
for i in range(len(parts) - 1):
|
||||
part = parts[i]
|
||||
if part not in section:
|
||||
section[part] = {}
|
||||
section = section[part]
|
||||
|
||||
obj = np.vstack([frame[data_key] for frame in dataframes]).T
|
||||
for frame in dataframes:
|
||||
del frame[data_key] # Memory cleanup
|
||||
|
||||
if obj.dtype == object:
|
||||
obj = obj.astype(str)
|
||||
section[parts[-1]] = obj
|
||||
|
||||
print("Cleanup: Deleting Dataframes from Memory")
|
||||
del dataframes # Memory cleanup
|
||||
logger.info("Cleanup: Deleting Dataframes from Memory")
|
||||
del dataframes # Memory cleanup
|
||||
|
||||
print("Converting to Awkward Array")
|
||||
a = ak.Array(result)
|
||||
del result # Memory cleanup
|
||||
|
||||
print("Writing to Disk")
|
||||
ak.to_parquet(a, os.path.join(scorecard_dir, "merged.parquet"))
|
||||
logger.info("Converting to Awkward Array")
|
||||
a = ak.Array(result)
|
||||
del result # Memory cleanup
|
||||
|
||||
logger.info("Writing to Disk")
|
||||
ak.to_parquet(a, os.path.join(scorecard_dir, "merged.parquet"))
|
||||
Reference in New Issue
Block a user