Has anyone else used PySpark dataframes in conjunction with H2O for ML
pipelining, and have had to use custom folds to keep rows/observations of
the same group (e.g. user account, vehicle, city or whatever) in the same
validation fold, so as to prevent data leakage during cross-validation??

On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c.m.s....@gmail.com> wrote:

> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
> where N := number of folds needed for k-fold CV during auto ML training) on
> train & test datasets
>
> ```
> # train & test are PySpark dataframes of the train & test datasets
> respectively
> import pyspark.sql.functions as F
> from functools import reduce  # For Python 3.x
> from pyspark.sql import DataFrame
> def unionAll(*dfs):
>     return reduce(DataFrame.unionAll, dfs)
>
> def create_fold_column(input_df,
> list_of_fold_group_columns=list_of_fold_group_columns,
> fold_column=fold_column, nfolds=nfolds):
>
>     if fold_column:
>         # split_into_frikkin_folds... we get a list of Spark dataframes of
> group ids
>         fold_group_ids_list_of_dataframes =
> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
> * [1 / nfolds], seed=42)
>
>         for index in range(0, len(fold_group_ids_list_of_dataframes)):
>             fold_group_ids_list_of_dataframes[index] =
> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
> F.lit(index))
>
>         fold_groups_ids_dataframes_union =
> unionAll(*fold_group_ids_list_of_dataframes)
>         input_df = input_df.join(fold_groups_ids_dataframes_union,
> on=list_of_fold_group_columns)
>
>     return(input_df)
>
> train = train.transform(create_fold_column)
> # Dummy fold_column with single number, nfolds (typically 5), to prevent
> H2O from error-ing out
> if fold_column:
>     test = test.withColumn(fold_column, F.lit(nfolds))
>
> ```
>

Reply via email to