This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f673ebd8afc [SPARK-39268][SQL] AttachDistributedSequenceExec do not 
checkpoint childRDD with single partition
f673ebd8afc is described below

commit f673ebd8afc94a3b434a0156b61366fede80b8f9
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Thu May 26 12:30:25 2022 +0800

    [SPARK-39268][SQL] AttachDistributedSequenceExec do not checkpoint childRDD 
with single partition
    
    ### What changes were proposed in this pull request?
    do not checkpoint child rdd when it only has 1 partition
    
    ### Why are the changes needed?
    avoid necessary checkpoint
    
    when child rdd only has 1 partition, `zipWithIndex` will not trigger an 
action
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    existing suites
    
    Closes #36648 from zhengruifeng/sql_do_not_checkpoint_with_single_partition.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/pandas/tests/test_groupby.py                   |  9 ++++++---
 .../sql/execution/python/AttachDistributedSequenceExec.scala  | 11 ++++++++---
 2 files changed, 14 insertions(+), 6 deletions(-)

diff --git a/python/pyspark/pandas/tests/test_groupby.py 
b/python/pyspark/pandas/tests/test_groupby.py
index 045cbaf5274..ac1e73f9d5d 100644
--- a/python/pyspark/pandas/tests/test_groupby.py
+++ b/python/pyspark/pandas/tests/test_groupby.py
@@ -2256,9 +2256,12 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
             acc += 1
             return np.sum(x)
 
-        actual = psdf.groupby("d").apply(sum_with_acc_frame).sort_index()
+        actual = psdf.groupby("d").apply(sum_with_acc_frame)
         actual.columns = ["d", "v"]
-        self.assert_eq(actual, 
pdf.groupby("d").apply(sum).sort_index().reset_index(drop=True))
+        self.assert_eq(
+            actual.to_pandas().sort_index(),
+            pdf.groupby("d").apply(sum).sort_index().reset_index(drop=True),
+        )
         self.assert_eq(acc.value, 2)
 
         def sum_with_acc_series(x) -> np.float64:
@@ -2267,7 +2270,7 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils):
             return np.sum(x)
 
         self.assert_eq(
-            psdf.groupby("d")["v"].apply(sum_with_acc_series).sort_index(),
+            
psdf.groupby("d")["v"].apply(sum_with_acc_series).to_pandas().sort_index(),
             
pdf.groupby("d")["v"].apply(sum).sort_index().reset_index(drop=True),
         )
         self.assert_eq(acc.value, 4)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
index 203fb6d7d50..5f722826fc7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala
@@ -41,9 +41,14 @@ case class AttachDistributedSequenceExec(
   override def outputPartitioning: Partitioning = child.outputPartitioning
 
   override protected def doExecute(): RDD[InternalRow] = {
-    child.execute().map(_.copy())
-        .localCheckpoint() // to avoid execute multiple jobs. zipWithIndex 
launches a Spark job.
-        .zipWithIndex().mapPartitions { iter =>
+    val childRDD = child.execute().map(_.copy())
+    val checkpointed = if (childRDD.getNumPartitions > 1) {
+      // to avoid execute multiple jobs. zipWithIndex launches a Spark job.
+      childRDD.localCheckpoint()
+    } else {
+      childRDD
+    }
+    checkpointed.zipWithIndex().mapPartitions { iter =>
       val unsafeProj = UnsafeProjection.create(output, output)
       val joinedRow = new JoinedRow
       val unsafeRowWriter =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to