Hi all, For our use case, we would like to perform an aggregation using a pandas_udf with dataframes that have O(100m) rows and a few 10s of columns. Conceptually, this looks a bit like pyspark.RDD.aggregate, where the user provides:
* A "seqOp" which accepts pandas series(*) and outputs an intermediate output * A "combOp" which combines the intermediate outputs into a final output There's no direct DataFrame equivalent to RDD.aggregate(), but you can somewhat emulate the functionality with df.groupBy().applyInPandas(seqOp).agg(combOp) However, it seems like using groupBy() w/o any columns isn't the intended use. The docs for groupBy().applyInPandas() has the following note: > Note This function requires a full shuffle. All the data of a group will be > loaded into > memory, so the user should be aware of the potential OOM risk > if data is skewed > and certain groups are too large to fit in memory. The Spark SQL guide has the following note as well: > The configuration for maxRecordsPerBatch is not applied on groups and it is > up to > the user to ensure that the grouped data will fit into the available > memory. Since we want to perform this aggregation over the entire DataFrame, we end up with one group who is entirely loaded into memory which immediately OOMs (requiring a shuffle probably doesn't help either). To work around this, we make smaller groups by repartitioning, adding a new column with the partition ID, then do the groupBy against that column to make smaller groups, but that's not great -- it's a lot of extra data movement. Am I missing something obvious? Or is this simply a part of the API that's not fleshed out yet? Thanks Andrew * Unfortunately, Pandas' data model is less rich than spark/arrow/our code, so the JVM composes an arrow stream and transmits it to the python worker who then converts it to pandas before passing it to the UDF. We then have to undo the conversion to get the original data back. It'd be nice to have more control over that intermediate conversion. --------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org