Another possible workaround, when creating an ML pipeline with PySpark and
Python's H2O API, would be to first convert the PySpark dataframes to H2O
dataframes, then do the following:


   1. Create a new dataframe from the feature dataframe using
   drop_duplicates (call this group_df), with columns argument = list of
   columns that form the group
   
https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.drop_duplicates
   :
   2. Create a new, single-column dataframe (call this kfold_df) from
   group_df using kfold_column:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.kfold_column
   3. Column-bind `kfold_df` back onto `group_df` with cbind:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-py/docs/frame.html#h2o.H2OFrame.cbind
   4. Merge group_df back to the feature dataframe on the common columns
   (which would be the group columns specified in step 1. above) to create the
   feature dataset with fold_column:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-munging/merging-data.html.
   Observations with the same values in the group columns will have the same
   fold index in the fold_column.
   5. Configure the parameters of the H2O AutoML job, as per
   https://docs.h2o.ai/h2o/latest-stable/h2o-docs/automl.html, ensuring
   that fold_column argument = name of the column created from calling
   kfold_column:
   
https://docs.h2o.ai/h2o/latest-stable/h2o-docs/data-science/algo-params/fold_column.html,
   class imbalance is rectified and so on.

Any suggestions? Are there already similar wrapper functions in PySpark or
H2O that assign fold indices based on groups of observations, rather than
individual observations?
Should I propose this in the d...@spark.apache.org mailing list??

On Fri, Jan 7, 2022 at 1:04 PM Chester Gan <c.m.s....@gmail.com> wrote:

> Idea: PySpark function to create fold indices (numbers from 0, ..., N-1,
> where N := number of folds needed for k-fold CV during auto ML training) on
> train & test datasets
>
> ```
> # train & test are PySpark dataframes of the train & test datasets
> respectively
> import pyspark.sql.functions as F
> from functools import reduce  # For Python 3.x
> from pyspark.sql import DataFrame
> def unionAll(*dfs):
>     return reduce(DataFrame.unionAll, dfs)
>
> def create_fold_column(input_df,
> list_of_fold_group_columns=list_of_fold_group_columns,
> fold_column=fold_column, nfolds=nfolds):
>
>     if fold_column:
>         # split_into_frikkin_folds... we get a list of Spark dataframes of
> group ids
>         fold_group_ids_list_of_dataframes =
> input_df.drop_duplicates(subset=list_of_fold_group_columns)[list_of_fold_group_columns].randomSplit(nfolds
> * [1 / nfolds], seed=42)
>
>         for index in range(0, len(fold_group_ids_list_of_dataframes)):
>             fold_group_ids_list_of_dataframes[index] =
> fold_group_ids_list_of_dataframes[index].withColumn(fold_column,
> F.lit(index))
>
>         fold_groups_ids_dataframes_union =
> unionAll(*fold_group_ids_list_of_dataframes)
>         input_df = input_df.join(fold_groups_ids_dataframes_union,
> on=list_of_fold_group_columns)
>
>     return(input_df)
>
> train = train.transform(create_fold_column)
> # Dummy fold_column with single number, nfolds (typically 5), to prevent
> H2O from error-ing out
> if fold_column:
>     test = test.withColumn(fold_column, F.lit(nfolds))
>
> ```
>

Reply via email to