This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f673ebd8afc [SPARK-39268][SQL] AttachDistributedSequenceExec do not checkpoint childRDD with single partition f673ebd8afc is described below commit f673ebd8afc94a3b434a0156b61366fede80b8f9 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Thu May 26 12:30:25 2022 +0800 [SPARK-39268][SQL] AttachDistributedSequenceExec do not checkpoint childRDD with single partition ### What changes were proposed in this pull request? do not checkpoint child rdd when it only has 1 partition ### Why are the changes needed? avoid necessary checkpoint when child rdd only has 1 partition, `zipWithIndex` will not trigger an action ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing suites Closes #36648 from zhengruifeng/sql_do_not_checkpoint_with_single_partition. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/pandas/tests/test_groupby.py | 9 ++++++--- .../sql/execution/python/AttachDistributedSequenceExec.scala | 11 ++++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py index 045cbaf5274..ac1e73f9d5d 100644 --- a/python/pyspark/pandas/tests/test_groupby.py +++ b/python/pyspark/pandas/tests/test_groupby.py @@ -2256,9 +2256,12 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): acc += 1 return np.sum(x) - actual = psdf.groupby("d").apply(sum_with_acc_frame).sort_index() + actual = psdf.groupby("d").apply(sum_with_acc_frame) actual.columns = ["d", "v"] - self.assert_eq(actual, pdf.groupby("d").apply(sum).sort_index().reset_index(drop=True)) + self.assert_eq( + actual.to_pandas().sort_index(), + pdf.groupby("d").apply(sum).sort_index().reset_index(drop=True), + ) self.assert_eq(acc.value, 2) def sum_with_acc_series(x) -> np.float64: @@ -2267,7 +2270,7 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): return np.sum(x) self.assert_eq( - psdf.groupby("d")["v"].apply(sum_with_acc_series).sort_index(), + psdf.groupby("d")["v"].apply(sum_with_acc_series).to_pandas().sort_index(), pdf.groupby("d")["v"].apply(sum).sort_index().reset_index(drop=True), ) self.assert_eq(acc.value, 4) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala index 203fb6d7d50..5f722826fc7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/AttachDistributedSequenceExec.scala @@ -41,9 +41,14 @@ case class AttachDistributedSequenceExec( override def outputPartitioning: Partitioning = child.outputPartitioning override protected def doExecute(): RDD[InternalRow] = { - child.execute().map(_.copy()) - .localCheckpoint() // to avoid execute multiple jobs. zipWithIndex launches a Spark job. - .zipWithIndex().mapPartitions { iter => + val childRDD = child.execute().map(_.copy()) + val checkpointed = if (childRDD.getNumPartitions > 1) { + // to avoid execute multiple jobs. zipWithIndex launches a Spark job. + childRDD.localCheckpoint() + } else { + childRDD + } + checkpointed.zipWithIndex().mapPartitions { iter => val unsafeProj = UnsafeProjection.create(output, output) val joinedRow = new JoinedRow val unsafeRowWriter = --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org