This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ad1053  [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty 
partitions
5ad1053 is described below

commit 5ad1053f3e8b7acab58e07e7548e7f14e192e5b4
Author: Bryan Cutler <cutl...@gmail.com>
AuthorDate: Sat Jun 22 11:20:35 2019 +0900

    [SPARK-28128][PYTHON][SQL] Pandas Grouped UDFs skip empty partitions
    
    ## What changes were proposed in this pull request?
    
    When running FlatMapGroupsInPandasExec or AggregateInPandasExec the shuffle 
uses a default number of partitions of 200 in "spark.sql.shuffle.partitions". 
If the data is small, e.g. in testing, many of the partitions will be empty but 
are treated just the same.
    
    This PR checks the `mapPartitionsInternal` iterator to be non-empty before 
calling `ArrowPythonRunner` to start computation on the iterator.
    
    ## How was this patch tested?
    
    Existing tests. Ran the following benchmarks a simple example where most 
partitions are empty:
    
    ```python
    from pyspark.sql.functions import pandas_udf, PandasUDFType
    from pyspark.sql.types import *
    
    df = spark.createDataFrame(
         [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
         ("id", "v"))
    
    pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
    def normalize(pdf):
        v = pdf.v
        return pdf.assign(v=(v - v.mean()) / v.std())
    
    df.groupby("id").apply(normalize).count()
    ```
    
    **Before**
    ```
    In [4]: %timeit df.groupby("id").apply(normalize).count()
    1.58 s ± 62.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    In [5]: %timeit df.groupby("id").apply(normalize).count()
    1.52 s ± 29.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    In [6]: %timeit df.groupby("id").apply(normalize).count()
    1.52 s ± 37.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    ```
    
    **After this Change**
    ```
    In [2]: %timeit df.groupby("id").apply(normalize).count()
    646 ms ± 89.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    In [3]: %timeit df.groupby("id").apply(normalize).count()
    408 ms ± 84.6 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    In [4]: %timeit df.groupby("id").apply(normalize).count()
    381 ms ± 29.9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    ```
    
    Closes #24926 from 
BryanCutler/pyspark-pandas_udf-map-agg-skip-empty-parts-SPARK-28128.
    
    Authored-by: Bryan Cutler <cutl...@gmail.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py     | 13 +++++++++++++
 python/pyspark/sql/tests/test_pandas_udf_grouped_map.py     | 12 ++++++++++++
 .../spark/sql/execution/python/AggregateInPandasExec.scala  |  5 +++--
 .../sql/execution/python/FlatMapGroupsInPandasExec.scala    |  5 +++--
 4 files changed, 31 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py 
b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
index 9eda1aa..f5fd725 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_agg.py
@@ -18,6 +18,7 @@
 import unittest
 
 from pyspark.rdd import PythonEvalType
+from pyspark.sql import Row
 from pyspark.sql.functions import array, explode, col, lit, mean, sum, \
     udf, pandas_udf, PandasUDFType
 from pyspark.sql.types import *
@@ -461,6 +462,18 @@ class GroupedAggPandasUDFTests(ReusedSQLTestCase):
         expected = [1, 5]
         self.assertEqual(actual, expected)
 
+    def test_grouped_with_empty_partition(self):
+        data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
+        expected = [Row(id=1, sum=5), Row(id=2, x=4)]
+        num_parts = len(data) + 1
+        df = self.spark.createDataFrame(self.sc.parallelize(data, 
numSlices=num_parts))
+
+        f = pandas_udf(lambda x: x.sum(),
+                       'int', PandasUDFType.GROUPED_AGG)
+
+        result = df.groupBy('id').agg(f(df['x']).alias('sum')).collect()
+        self.assertEqual(result, expected)
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_udf_grouped_agg import *
diff --git a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py 
b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
index 1d87c63..32d6720 100644
--- a/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
+++ b/python/pyspark/sql/tests/test_pandas_udf_grouped_map.py
@@ -504,6 +504,18 @@ class GroupedMapPandasUDFTests(ReusedSQLTestCase):
 
         self.assertEquals(result.collect()[0]['sum'], 165)
 
+    def test_grouped_with_empty_partition(self):
+        data = [Row(id=1, x=2), Row(id=1, x=3), Row(id=2, x=4)]
+        expected = [Row(id=1, x=5), Row(id=1, x=5), Row(id=2, x=4)]
+        num_parts = len(data) + 1
+        df = self.spark.createDataFrame(self.sc.parallelize(data, 
numSlices=num_parts))
+
+        f = pandas_udf(lambda pdf: pdf.assign(x=pdf['x'].sum()),
+                       'id long, x int', PandasUDFType.GROUPED_MAP)
+
+        result = df.groupBy('id').apply(f).collect()
+        self.assertEqual(result, expected)
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.test_pandas_udf_grouped_map import *
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
index 0c78cca..fcbd0b1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AggregateInPandasExec.scala
@@ -105,7 +105,8 @@ case class AggregateInPandasExec(
       StructField(s"_$i", dt)
     })
 
-    inputRDD.mapPartitionsInternal { iter =>
+    // Map grouped rows to ArrowPythonRunner results, Only execute if 
partition is not empty
+    inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
       val prunedProj = UnsafeProjection.create(allInputs, child.output)
 
       val grouped = if (groupingExpressions.isEmpty) {
@@ -151,6 +152,6 @@ case class AggregateInPandasExec(
         val joinedRow = joined(leftRow, aggOutputRow)
         resultProj(joinedRow)
       }
-    }
+    }}
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 7b0e014..267698d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -125,7 +125,8 @@ case class FlatMapGroupsInPandasExec(
     val dedupAttributes = nonDupGroupingAttributes ++ dataAttributes
     val dedupSchema = StructType.fromAttributes(dedupAttributes)
 
-    inputRDD.mapPartitionsInternal { iter =>
+    // Map grouped rows to ArrowPythonRunner results, Only execute if 
partition is not empty
+    inputRDD.mapPartitionsInternal { iter => if (iter.isEmpty) iter else {
       val grouped = if (groupingAttributes.isEmpty) {
         Iterator(iter)
       } else {
@@ -156,6 +157,6 @@ case class FlatMapGroupsInPandasExec(
         flattenedBatch.setNumRows(batch.numRows())
         flattenedBatch.rowIterator.asScala
       }.map(unsafeProj)
-    }
+    }}
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to