Another possible workaround, when creating an ML pipeline with PySpark and Python's H2O API, would be to first convert the PySpark dataframes to H2O dataframes, then do the following:
1. Create a new dataframe from the feature dataframe using drop_duplicates (call this group_df), with columns argument = list of columns that form the group https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates : 2. Create a new, single-column dataframe (call this kfold_df) from group_df using kfold_column: https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column 3. Column-bind `kfold_df` back onto `group_df` with cbind: https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind 4. Merge group_df back to the feature dataframe on the common columns (which would be the group columns specified in step 1. above) to create the feature dataset with fold_column: https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html. Observations with the same values in the group columns will have the same fold index in the fold_column. 5. Configure the parameters of the H2O AutoML job, as per https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring that fold_column argument = name of the column created from calling kfold_column: https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html, class imbalance is rectified and so on. Any suggestions? Are there already similar wrapper functions in PySpark or H2O that assign fold indices based on groups of observations, rather than individual observations? Should I propose this in the d...@spark.apache.org mailing list?? On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c.m.s....@gmail.com> wrote: > Idea: PySpark function to create fold indices (numbers from 0, ..., N-1, > where N := number of folds needed for k-fold CV during auto ML training) on > train & test datasets > > ``` > # train & test are PySpark dataframes of the train & test datasets > respectively > import pyspark.sql.functions as F > from functools import reduce # For Python 3.x > from pyspark.sql import DataFrame > def unionAll(*dfs): > return reduce(DataFrame.unionAll, dfs) > > def create_fold_column(input_df, > list_of_fold_group_columns=list_of_fold_group_columns, > fold_column=fold_column, nfolds=nfolds): > > if fold_column: > # split_into_frikkin_folds... we get a list of Spark dataframes of > group ids > fold_group_ids_list_of_dataframes = > input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds > * [1 / nfolds], seed=42) > > for index in range(0, len(fold_group_ids_list_of_dataframes)): > fold_group_ids_list_of_dataframes[index] = > fold_group_ids_list_of_dataframes[index].withColumn(fold_column, > F.lit(index)) > > fold_groups_ids_dataframes_union = > unionAll(*fold_group_ids_list_of_dataframes) > input_df = input_df.join(fold_groups_ids_dataframes_union, > on=list_of_fold_group_columns) > > return(input_df) > > train = train.transform(create_fold_column) > # Dummy fold_column with single number, nfolds (typically 5), to prevent > H2O from error-ing out > if fold_column: > test = test.withColumn(fold_column, F.lit(nfolds)) > > ``` >