Has anyone else used PySpark dataframes in conjunction with H2O for ML pipelining, and have had to use custom folds to keep rows/observations of the same group (e.g. user account, vehicle, city or whatever) in the same validation fold, so as to prevent data leakage during cross-validation??
On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c.m.s....@gmail.com> wrote: > Idea: PySpark function to create fold indices (numbers from 0, ..., N-1, > where N := number of folds needed for k-fold CV during auto ML training) on > train & test datasets > > ``` > # train & test are PySpark dataframes of the train & test datasets > respectively > import pyspark.sql.functions as F > from functools import reduce # For Python 3.x > from pyspark.sql import DataFrame > def unionAll(*dfs): > return reduce(DataFrame.unionAll, dfs) > > def create_fold_column(input_df, > list_of_fold_group_columns=list_of_fold_group_columns, > fold_column=fold_column, nfolds=nfolds): > > if fold_column: > # split_into_frikkin_folds... we get a list of Spark dataframes of > group ids > fold_group_ids_list_of_dataframes = > input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds > * [1 / nfolds], seed=42) > > for index in range(0, len(fold_group_ids_list_of_dataframes)): > fold_group_ids_list_of_dataframes[index] = > fold_group_ids_list_of_dataframes[index].withColumn(fold_column, > F.lit(index)) > > fold_groups_ids_dataframes_union = > unionAll(*fold_group_ids_list_of_dataframes) > input_df = input_df.join(fold_groups_ids_dataframes_union, > on=list_of_fold_group_columns) > > return(input_df) > > train = train.transform(create_fold_column) > # Dummy fold_column with single number, nfolds (typically 5), to prevent > H2O from error-ing out > if fold_column: > test = test.withColumn(fold_column, F.lit(nfolds)) > > ``` >