import os import yaml import logging import awkward as ak import pandas as pd import numpy as np import tqdm import utils if __name__ == "__main__": # Setup Args parser = utils.get_common_args(prog="CSV -> Pandas Scorecard Converter") args = parser.parse_args() # Setup Logging utils.setup_logging(args.debug) logger = logging.getLogger("CSVPandasConverter") scorecard_dir = os.path.join(args.data_dir, "scorecard") scorecard_dir = os.path.join(scorecard_dir, os.listdir(scorecard_dir)[0]) logger.info(f"Loading College Scorecard data from directory {scorecard_dir}") logger.debug("Loading metadata from data.yaml") with open(os.path.join(scorecard_dir, 'data.yaml'), 'r') as file: data = yaml.safe_load(file) logger.info("Loading all CSV files as Pandas dataframes") files = [f'MERGED{i}_{(i + 1) % 100:02}_PP.csv' for i in tqdm.trange(1996, 2024)] dataframes = [pd.read_csv(os.path.join(scorecard_dir, file)) for file in tqdm.tqdm(files)] logger.info("Creating list of all UNITIDs across all files") unit_ids = np.unique(np.hstack([frame.UNITID.to_numpy() for frame in tqdm.tqdm(dataframes)])) logger.info("Appending extra columns to each year's dataframes to prepare for appending") for i, frame in tqdm.tqdm(enumerate(dataframes)): new_rows = pd.DataFrame({"UNITID": unit_ids[~np.isin(unit_ids, frame.UNITID)]}) dataframes[i] = pd.concat([frame, new_rows]).sort_values(by=["UNITID", "OPEID"]) logger.info("Converting to Results Array") result = {} for key, sec in tqdm.tqdm(data['dictionary'].items()): if 'calculate' in sec: continue data_key = sec['source'] if data_key not in dataframes[0]: continue parts = key.split('.') section = result for i in range(len(parts) - 1): part = parts[i] if part not in section: section[part] = {} section = section[part] obj = np.vstack([frame[data_key] for frame in dataframes]).T for frame in dataframes: del frame[data_key] # Memory cleanup if obj.dtype == object: obj = obj.astype(str) section[parts[-1]] = obj logger.info("Cleanup: Deleting Dataframes from Memory") del dataframes # Memory cleanup logger.info("Converting to Awkward Array") a = ak.Array(result) del result # Memory cleanup logger.info("Writing to Disk") ak.to_parquet(a, os.path.join(scorecard_dir, "merged.parquet"))