[ 
https://issues.apache.org/jira/browse/SPARK-39931?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Enrico Minack updated SPARK-39931:
----------------------------------
    Description: 
Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in 
PySpark is very slow. The reason is that for each group, PySpark creates a 
Pandas DataFrame and calls into the Python code. For very small groups, the 
overhead is huge, for large groups, it is smaller.

Here is a benchmarks (seconds to groupBy(...).applyInPandas(...) 10m rows):
||groupSize||Scala||pyspark.sql||pyspark.pandas||
|1024|8.9|20.9|7.8|
|512|9.4|31.8|9.8|
|256|9.3|47.0|20.2|
|128|9.5|83.3|48.8|
|64|9.5|137.8|91.9|
|32|9.6|263.6|207.3|
|16|9.6|525.9|261.5|
|8|9.5|1,043|663.0|
|4|9.8|2,073|1,168|
|2|10.4|4,132|2,456|
|1|11.3|8,162|4,642|

*Idea to overcome this* is to call into Python side with a Pandas DataFrame 
that contains potentially multiple groups, then perform a Pandas 
DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame 
has all rows of single group, with small groups it contains many groups. This 
should improve efficiency.

I have prepared a PoC to benchmark that idea but am struggling to massage the 
internal Rows before sending them to Python. The idea is to turn the 
{{Dataset[T]}} into {{{}Dataset[(K, T)]{}}}, create a bespoke 
{{GroupedIterator}} that splits the Dataset into chunks that contain complete 
groups, sends them to Python, which the groups by key and apply the udf on the 
values. The first bit seems to be the hard bit:

{{{}sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala{}}}:
{code:scala}
    val inputRDD = ProjectExec(Seq(
        Alias(CreateNamedStruct(groupingAttributes.flatMap(a => Literal(a.name) 
:: a :: Nil)), "key")(),
        Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) :: 
a :: Nil)), "val")()
      ), child).execute()
{code}

  was:
Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in 
PySpark is very slow. The reason is that for each group, PySpark creates a 
Pandas DataFrame and calls into the Python code. For very small groups, the 
overhead is huge, for large groups, it is smaller.

Here is a benchmarks (sendonds to groupBy(...).applyInPandas(...) 10m rows):
||groupSize||Scala||pyspark.sql||pyspark.pandas||
|1024|8.9|20.9|7.8|
|512|9.4|31.8|9.8|
|256|9.3|47.0|20.2|
|128|9.5|83.3|48.8|
|64|9.5|137.8|91.9|
|32|9.6|263.6|207.3|
|16|9.6|525.9|261.5|
|8|9.5|1,043|663.0|
|4|9.8|2,073|1,168|
|2|10.4|4,132|2,456|
|1|11.3|8,162|4,642|

*Idea to overcome this* is to call into Python side with a Pandas DataFrame 
that contains potentially multiple groups, then perform a Pandas 
DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame 
has all rows of single group, with small groups it contains many groups. This 
should improve efficiency.

I have prepared a PoC to benchmark that idea but am struggling to massage the 
internal Rows before sending them to Python. The idea is to turn the 
{{Dataset[T]}} into {{Dataset[(K, T)]}}, create a bespoke {{GroupedIterator}} 
that splits the Dataset into chunks that contain complete groups, sends them to 
Python, which the groups by key and apply the udf on the values. The first bit 
seems to be the hard bit:

{{sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala}}:
{code:scala}
    val inputRDD = ProjectExec(Seq(
        Alias(CreateNamedStruct(groupingAttributes.flatMap(a => Literal(a.name) 
:: a :: Nil)), "key")(),
        Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) :: 
a :: Nil)), "val")()
      ), child).execute()
{code}


> Improve performance of applyInPandas for very small groups
> ----------------------------------------------------------
>
>                 Key: SPARK-39931
>                 URL: https://issues.apache.org/jira/browse/SPARK-39931
>             Project: Spark
>          Issue Type: New Feature
>          Components: PySpark
>    Affects Versions: 3.4.0
>            Reporter: Enrico Minack
>            Priority: Major
>
> Calling `DataFrame.groupby(...).applyInPandas(...)` for very small groups in 
> PySpark is very slow. The reason is that for each group, PySpark creates a 
> Pandas DataFrame and calls into the Python code. For very small groups, the 
> overhead is huge, for large groups, it is smaller.
> Here is a benchmarks (seconds to groupBy(...).applyInPandas(...) 10m rows):
> ||groupSize||Scala||pyspark.sql||pyspark.pandas||
> |1024|8.9|20.9|7.8|
> |512|9.4|31.8|9.8|
> |256|9.3|47.0|20.2|
> |128|9.5|83.3|48.8|
> |64|9.5|137.8|91.9|
> |32|9.6|263.6|207.3|
> |16|9.6|525.9|261.5|
> |8|9.5|1,043|663.0|
> |4|9.8|2,073|1,168|
> |2|10.4|4,132|2,456|
> |1|11.3|8,162|4,642|
> *Idea to overcome this* is to call into Python side with a Pandas DataFrame 
> that contains potentially multiple groups, then perform a Pandas 
> DataFrame.groupBy(...).apply(...). With large groups, that Panadas DataFrame 
> has all rows of single group, with small groups it contains many groups. This 
> should improve efficiency.
> I have prepared a PoC to benchmark that idea but am struggling to massage the 
> internal Rows before sending them to Python. The idea is to turn the 
> {{Dataset[T]}} into {{{}Dataset[(K, T)]{}}}, create a bespoke 
> {{GroupedIterator}} that splits the Dataset into chunks that contain complete 
> groups, sends them to Python, which the groups by key and apply the udf on 
> the values. The first bit seems to be the hard bit:
> {{{}sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala{}}}:
> {code:scala}
>     val inputRDD = ProjectExec(Seq(
>         Alias(CreateNamedStruct(groupingAttributes.flatMap(a => 
> Literal(a.name) :: a :: Nil)), "key")(),
>         Alias(CreateNamedStruct(dedupAttributes.flatMap(a => Literal(a.name) 
> :: a :: Nil)), "val")()
>       ), child).execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to