This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new da0c7cc81bb3 [SPARK-48037][CORE][3.4] Fix SortShuffleWriter lacks 
shuffle write related metrics resulting in potentially inaccurate data
da0c7cc81bb3 is described below

commit da0c7cc81bb3d69d381dd0683e910eae4c80e9ae
Author: sychen <syc...@ctrip.com>
AuthorDate: Wed May 8 07:30:21 2024 -0700

    [SPARK-48037][CORE][3.4] Fix SortShuffleWriter lacks shuffle write related 
metrics resulting in potentially inaccurate data
    
    ### What changes were proposed in this pull request?
    This PR aims to fix SortShuffleWriter lacks shuffle write related metrics 
resulting in potentially inaccurate data.
    
    ### Why are the changes needed?
    When the shuffle writer is SortShuffleWriter, it does not use 
SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain 
runtime statistics and the rowCount obtained is 0.
    
    Some optimization rules rely on rowCount statistics, such as 
`EliminateLimits`. Because rowCount is 0, it removes the limit operator. At 
this time, we get data results without limit.
    
    
https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172
    
    
https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070
    
    ### Does this PR introduce _any_ user-facing change?
    Yes
    
    ### How was this patch tested?
    Production environment verification.
    
    **master metrics**
    <img width="296" alt="image" 
src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c";>
    
    **PR metrics**
    
    <img width="276" alt="image" 
src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1";>
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #46464 from cxzl25/SPARK-48037-3.4.
    
    Authored-by: sychen <syc...@ctrip.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .github/workflows/build_and_test.yml               |  1 +
 .../spark/shuffle/sort/SortShuffleManager.scala    |  2 +-
 .../spark/shuffle/sort/SortShuffleWriter.scala     |  6 ++--
 .../spark/util/collection/ExternalSorter.scala     |  9 +++---
 .../shuffle/sort/SortShuffleWriterSuite.scala      |  3 ++
 .../sql/execution/UnsafeRowSerializerSuite.scala   |  3 +-
 .../adaptive/AdaptiveQueryExecSuite.scala          | 32 ++++++++++++++++++++--
 7 files changed, 44 insertions(+), 12 deletions(-)

diff --git a/.github/workflows/build_and_test.yml 
b/.github/workflows/build_and_test.yml
index 8ae303178033..2d2e8da80d46 100644
--- a/.github/workflows/build_and_test.yml
+++ b/.github/workflows/build_and_test.yml
@@ -644,6 +644,7 @@ jobs:
         python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 
'sphinx-copybutton==0.5.2' nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 
'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 
'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 
'sphinxcontrib-serializinghtml==1.1.5' 'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 
'alabaster==0.7.13'
         python3.9 -m pip install ipython_genutils # See SPARK-38517
         python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 
'pyarrow==12.0.1' pandas 'plotly>=4.8'
+        python3.9 -m pip install 'nbsphinx==0.9.3'
         python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421
         apt-get update -y
         apt-get install -y ruby ruby-dev
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
index 46aca07ce43f..79dff6f87534 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala
@@ -176,7 +176,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) 
extends ShuffleManager
           metrics,
           shuffleExecutorComponents)
       case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] =>
-        new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents)
+        new SortShuffleWriter(other, mapId, context, metrics, 
shuffleExecutorComponents)
     }
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala 
b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
index 8613fe11a4c2..3be7d24f7e4e 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala
@@ -21,6 +21,7 @@ import org.apache.spark._
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.scheduler.MapStatus
 import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter}
+import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
 import org.apache.spark.shuffle.api.ShuffleExecutorComponents
 import org.apache.spark.util.collection.ExternalSorter
 
@@ -28,6 +29,7 @@ private[spark] class SortShuffleWriter[K, V, C](
     handle: BaseShuffleHandle[K, V, C],
     mapId: Long,
     context: TaskContext,
+    writeMetrics: ShuffleWriteMetricsReporter,
     shuffleExecutorComponents: ShuffleExecutorComponents)
   extends ShuffleWriter[K, V] with Logging {
 
@@ -46,8 +48,6 @@ private[spark] class SortShuffleWriter[K, V, C](
 
   private var partitionLengths: Array[Long] = _
 
-  private val writeMetrics = context.taskMetrics().shuffleWriteMetrics
-
   /** Write a bunch of records to this task's output */
   override def write(records: Iterator[Product2[K, V]]): Unit = {
     sorter = if (dep.mapSideCombine) {
@@ -67,7 +67,7 @@ private[spark] class SortShuffleWriter[K, V, C](
     // (see SPARK-3570).
     val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter(
       dep.shuffleId, mapId, dep.partitioner.numPartitions)
-    sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter)
+    sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter, 
writeMetrics)
     partitionLengths = 
mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths
     mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, 
mapId)
   }
diff --git 
a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala 
b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
index 4ca838b7655c..28b874918580 100644
--- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
+++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala
@@ -29,7 +29,7 @@ import org.apache.spark._
 import org.apache.spark.executor.ShuffleWriteMetrics
 import org.apache.spark.internal.{config, Logging}
 import org.apache.spark.serializer._
-import org.apache.spark.shuffle.ShufflePartitionPairsWriter
+import org.apache.spark.shuffle.{ShufflePartitionPairsWriter, 
ShuffleWriteMetricsReporter}
 import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, 
ShufflePartitionWriter}
 import org.apache.spark.shuffle.checksum.ShuffleChecksumSupport
 import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter, 
ShuffleBlockId}
@@ -696,7 +696,8 @@ private[spark] class ExternalSorter[K, V, C](
   def writePartitionedMapOutput(
       shuffleId: Int,
       mapId: Long,
-      mapOutputWriter: ShuffleMapOutputWriter): Unit = {
+      mapOutputWriter: ShuffleMapOutputWriter,
+      writeMetrics: ShuffleWriteMetricsReporter): Unit = {
     var nextPartitionId = 0
     if (spills.isEmpty) {
       // Case where we only have in-memory data
@@ -714,7 +715,7 @@ private[spark] class ExternalSorter[K, V, C](
             serializerManager,
             serInstance,
             blockId,
-            context.taskMetrics().shuffleWriteMetrics,
+            writeMetrics,
             if (partitionChecksums.nonEmpty) partitionChecksums(partitionId) 
else null)
           while (it.hasNext && it.nextPartition() == partitionId) {
             it.writeNext(partitionPairsWriter)
@@ -739,7 +740,7 @@ private[spark] class ExternalSorter[K, V, C](
             serializerManager,
             serInstance,
             blockId,
-            context.taskMetrics().shuffleWriteMetrics,
+            writeMetrics,
             if (partitionChecksums.nonEmpty) partitionChecksums(id) else null)
           if (elements.hasNext) {
             for (elem <- elements) {
diff --git 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
index 9e52b5e15143..99402abb16ca 100644
--- 
a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala
@@ -85,6 +85,7 @@ class SortShuffleWriterSuite
       shuffleHandle,
       mapId = 1,
       context,
+      context.taskMetrics().shuffleWriteMetrics,
       shuffleExecutorComponents)
     writer.write(Iterator.empty)
     writer.stop(success = true)
@@ -102,6 +103,7 @@ class SortShuffleWriterSuite
       shuffleHandle,
       mapId = 2,
       context,
+      context.taskMetrics().shuffleWriteMetrics,
       shuffleExecutorComponents)
     writer.write(records.iterator)
     writer.stop(success = true)
@@ -158,6 +160,7 @@ class SortShuffleWriterSuite
         shuffleHandle,
         mapId = 0,
         context,
+        context.taskMetrics().shuffleWriteMetrics,
         new LocalDiskShuffleExecutorComponents(
           conf, shuffleBlockResolver._blockManager, shuffleBlockResolver))
       writer.write(records.iterator)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index d94934210615..928d732f2a16 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -130,7 +130,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkSession {
     assert(sorter.numSpills > 0)
 
     // Merging spilled files should not throw assertion error
-    sorter.writePartitionedMapOutput(0, 0, mapOutputWriter)
+    sorter.writePartitionedMapOutput(0, 0, mapOutputWriter,
+      taskContext.taskMetrics.shuffleWriteMetrics)
   }
 
   test("SPARK-10403: unsafe row serializer with SortShuffleManager") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
index 593bd7bb4bac..71390a63c753 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, 
SparkListenerJobStart}
+import org.apache.spark.shuffle.sort.SortShuffleManager
 import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
 import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
@@ -59,7 +60,8 @@ class AdaptiveQueryExecSuite
 
   setupTestData()
 
-  private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, 
SparkPlan) = {
+  private def runAdaptiveAndVerifyResult(query: String,
+      skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = {
     var finalPlanCnt = 0
     val listener = new SparkListener {
       override def onOtherEvent(event: SparkListenerEvent): Unit = {
@@ -80,8 +82,10 @@ class AdaptiveQueryExecSuite
     assert(planBefore.toString.startsWith("AdaptiveSparkPlan 
isFinalPlan=false"))
     val result = dfAdaptive.collect()
     withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") {
-      val df = sql(query)
-      checkAnswer(df, result)
+      if (!skipCheckAnswer) {
+        val df = sql(query)
+        checkAnswer(df, result)
+      }
     }
     val planAfter = dfAdaptive.queryExecution.executedPlan
     assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
@@ -2390,6 +2394,28 @@ class AdaptiveQueryExecSuite
     }
   }
 
+  test("SPARK-48037: Fix SortShuffleWriter lacks shuffle write related metrics 
" +
+    "resulting in potentially inaccurate data") {
+    withTable("t3") {
+      withSQLConf(
+        SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
+        SQLConf.SHUFFLE_PARTITIONS.key -> (SortShuffleManager
+          .MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE + 1).toString) {
+        sql("CREATE TABLE t3 USING PARQUET AS SELECT id FROM range(2)")
+        val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
+          """
+            |SELECT id, count(*)
+            |FROM t3
+            |GROUP BY id
+            |LIMIT 1
+            |""".stripMargin, skipCheckAnswer = true)
+        // The shuffle stage produces two rows and the limit operator should 
not been optimized out.
+        assert(findTopLevelLimit(plan).size == 1)
+        assert(findTopLevelLimit(adaptivePlan).size == 1)
+      }
+    }
+  }
+
   test("SPARK-37063: OptimizeSkewInRebalancePartitions support optimize 
non-root node") {
     withTempView("v") {
       withSQLConf(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to