This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new da0c7cc81bb3 [SPARK-48037][CORE][3.4] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data da0c7cc81bb3 is described below commit da0c7cc81bb3d69d381dd0683e910eae4c80e9ae Author: sychen <syc...@ctrip.com> AuthorDate: Wed May 8 07:30:21 2024 -0700 [SPARK-48037][CORE][3.4] Fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data ### What changes were proposed in this pull request? This PR aims to fix SortShuffleWriter lacks shuffle write related metrics resulting in potentially inaccurate data. ### Why are the changes needed? When the shuffle writer is SortShuffleWriter, it does not use SQLShuffleWriteMetricsReporter to update metrics, which causes AQE to obtain runtime statistics and the rowCount obtained is 0. Some optimization rules rely on rowCount statistics, such as `EliminateLimits`. Because rowCount is 0, it removes the limit operator. At this time, we get data results without limit. https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala#L168-L172 https://github.com/apache/spark/blob/59d5946cfd377e9203ccf572deb34f87fab7510c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L2067-L2070 ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? Production environment verification. **master metrics** <img width="296" alt="image" src="https://github.com/apache/spark/assets/3898450/dc9b6e8a-93ec-4f59-a903-71aa5b11962c"> **PR metrics** <img width="276" alt="image" src="https://github.com/apache/spark/assets/3898450/2d73b773-2dcc-4d23-81de-25dcadac86c1"> ### Was this patch authored or co-authored using generative AI tooling? No Closes #46464 from cxzl25/SPARK-48037-3.4. Authored-by: sychen <syc...@ctrip.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .github/workflows/build_and_test.yml | 1 + .../spark/shuffle/sort/SortShuffleManager.scala | 2 +- .../spark/shuffle/sort/SortShuffleWriter.scala | 6 ++-- .../spark/util/collection/ExternalSorter.scala | 9 +++--- .../shuffle/sort/SortShuffleWriterSuite.scala | 3 ++ .../sql/execution/UnsafeRowSerializerSuite.scala | 3 +- .../adaptive/AdaptiveQueryExecSuite.scala | 32 ++++++++++++++++++++-- 7 files changed, 44 insertions(+), 12 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 8ae303178033..2d2e8da80d46 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -644,6 +644,7 @@ jobs: python3.9 -m pip install 'sphinx<3.1.0' mkdocs pydata_sphinx_theme 'sphinx-copybutton==0.5.2' nbsphinx numpydoc 'jinja2<3.0.0' 'markupsafe==2.0.1' 'pyzmq<24.0.0' 'sphinxcontrib-applehelp==1.0.4' 'sphinxcontrib-devhelp==1.0.2' 'sphinxcontrib-htmlhelp==2.0.1' 'sphinxcontrib-qthelp==1.0.3' 'sphinxcontrib-serializinghtml==1.1.5' 'nest-asyncio==1.5.8' 'rpds-py==0.16.2' 'alabaster==0.7.13' python3.9 -m pip install ipython_genutils # See SPARK-38517 python3.9 -m pip install sphinx_plotly_directive 'numpy>=1.20.0' 'pyarrow==12.0.1' pandas 'plotly>=4.8' + python3.9 -m pip install 'nbsphinx==0.9.3' python3.9 -m pip install 'docutils<0.18.0' # See SPARK-39421 apt-get update -y apt-get install -y ruby ruby-dev diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 46aca07ce43f..79dff6f87534 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -176,7 +176,7 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager metrics, shuffleExecutorComponents) case other: BaseShuffleHandle[K @unchecked, V @unchecked, _] => - new SortShuffleWriter(other, mapId, context, shuffleExecutorComponents) + new SortShuffleWriter(other, mapId, context, metrics, shuffleExecutorComponents) } } diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala index 8613fe11a4c2..3be7d24f7e4e 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleWriter.scala @@ -21,6 +21,7 @@ import org.apache.spark._ import org.apache.spark.internal.{config, Logging} import org.apache.spark.scheduler.MapStatus import org.apache.spark.shuffle.{BaseShuffleHandle, ShuffleWriter} +import org.apache.spark.shuffle.ShuffleWriteMetricsReporter import org.apache.spark.shuffle.api.ShuffleExecutorComponents import org.apache.spark.util.collection.ExternalSorter @@ -28,6 +29,7 @@ private[spark] class SortShuffleWriter[K, V, C]( handle: BaseShuffleHandle[K, V, C], mapId: Long, context: TaskContext, + writeMetrics: ShuffleWriteMetricsReporter, shuffleExecutorComponents: ShuffleExecutorComponents) extends ShuffleWriter[K, V] with Logging { @@ -46,8 +48,6 @@ private[spark] class SortShuffleWriter[K, V, C]( private var partitionLengths: Array[Long] = _ - private val writeMetrics = context.taskMetrics().shuffleWriteMetrics - /** Write a bunch of records to this task's output */ override def write(records: Iterator[Product2[K, V]]): Unit = { sorter = if (dep.mapSideCombine) { @@ -67,7 +67,7 @@ private[spark] class SortShuffleWriter[K, V, C]( // (see SPARK-3570). val mapOutputWriter = shuffleExecutorComponents.createMapOutputWriter( dep.shuffleId, mapId, dep.partitioner.numPartitions) - sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter) + sorter.writePartitionedMapOutput(dep.shuffleId, mapId, mapOutputWriter, writeMetrics) partitionLengths = mapOutputWriter.commitAllPartitions(sorter.getChecksums).getPartitionLengths mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths, mapId) } diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index 4ca838b7655c..28b874918580 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -29,7 +29,7 @@ import org.apache.spark._ import org.apache.spark.executor.ShuffleWriteMetrics import org.apache.spark.internal.{config, Logging} import org.apache.spark.serializer._ -import org.apache.spark.shuffle.ShufflePartitionPairsWriter +import org.apache.spark.shuffle.{ShufflePartitionPairsWriter, ShuffleWriteMetricsReporter} import org.apache.spark.shuffle.api.{ShuffleMapOutputWriter, ShufflePartitionWriter} import org.apache.spark.shuffle.checksum.ShuffleChecksumSupport import org.apache.spark.storage.{BlockId, DiskBlockObjectWriter, ShuffleBlockId} @@ -696,7 +696,8 @@ private[spark] class ExternalSorter[K, V, C]( def writePartitionedMapOutput( shuffleId: Int, mapId: Long, - mapOutputWriter: ShuffleMapOutputWriter): Unit = { + mapOutputWriter: ShuffleMapOutputWriter, + writeMetrics: ShuffleWriteMetricsReporter): Unit = { var nextPartitionId = 0 if (spills.isEmpty) { // Case where we only have in-memory data @@ -714,7 +715,7 @@ private[spark] class ExternalSorter[K, V, C]( serializerManager, serInstance, blockId, - context.taskMetrics().shuffleWriteMetrics, + writeMetrics, if (partitionChecksums.nonEmpty) partitionChecksums(partitionId) else null) while (it.hasNext && it.nextPartition() == partitionId) { it.writeNext(partitionPairsWriter) @@ -739,7 +740,7 @@ private[spark] class ExternalSorter[K, V, C]( serializerManager, serInstance, blockId, - context.taskMetrics().shuffleWriteMetrics, + writeMetrics, if (partitionChecksums.nonEmpty) partitionChecksums(id) else null) if (elements.hasNext) { for (elem <- elements) { diff --git a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala index 9e52b5e15143..99402abb16ca 100644 --- a/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala +++ b/core/src/test/scala/org/apache/spark/shuffle/sort/SortShuffleWriterSuite.scala @@ -85,6 +85,7 @@ class SortShuffleWriterSuite shuffleHandle, mapId = 1, context, + context.taskMetrics().shuffleWriteMetrics, shuffleExecutorComponents) writer.write(Iterator.empty) writer.stop(success = true) @@ -102,6 +103,7 @@ class SortShuffleWriterSuite shuffleHandle, mapId = 2, context, + context.taskMetrics().shuffleWriteMetrics, shuffleExecutorComponents) writer.write(records.iterator) writer.stop(success = true) @@ -158,6 +160,7 @@ class SortShuffleWriterSuite shuffleHandle, mapId = 0, context, + context.taskMetrics().shuffleWriteMetrics, new LocalDiskShuffleExecutorComponents( conf, shuffleBlockResolver._blockManager, shuffleBlockResolver)) writer.write(records.iterator) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala index d94934210615..928d732f2a16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala @@ -130,7 +130,8 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with LocalSparkSession { assert(sorter.numSpills > 0) // Merging spilled files should not throw assertion error - sorter.writePartitionedMapOutput(0, 0, mapOutputWriter) + sorter.writePartitionedMapOutput(0, 0, mapOutputWriter, + taskContext.taskMetrics.shuffleWriteMetrics) } test("SPARK-10403: unsafe row serializer with SortShuffleManager") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 593bd7bb4bac..71390a63c753 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -26,6 +26,7 @@ import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkException import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListenerJobStart} +import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} @@ -59,7 +60,8 @@ class AdaptiveQueryExecSuite setupTestData() - private def runAdaptiveAndVerifyResult(query: String): (SparkPlan, SparkPlan) = { + private def runAdaptiveAndVerifyResult(query: String, + skipCheckAnswer: Boolean = false): (SparkPlan, SparkPlan) = { var finalPlanCnt = 0 val listener = new SparkListener { override def onOtherEvent(event: SparkListenerEvent): Unit = { @@ -80,8 +82,10 @@ class AdaptiveQueryExecSuite assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false")) val result = dfAdaptive.collect() withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { - val df = sql(query) - checkAnswer(df, result) + if (!skipCheckAnswer) { + val df = sql(query) + checkAnswer(df, result) + } } val planAfter = dfAdaptive.queryExecution.executedPlan assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) @@ -2390,6 +2394,28 @@ class AdaptiveQueryExecSuite } } + test("SPARK-48037: Fix SortShuffleWriter lacks shuffle write related metrics " + + "resulting in potentially inaccurate data") { + withTable("t3") { + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.SHUFFLE_PARTITIONS.key -> (SortShuffleManager + .MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE + 1).toString) { + sql("CREATE TABLE t3 USING PARQUET AS SELECT id FROM range(2)") + val (plan, adaptivePlan) = runAdaptiveAndVerifyResult( + """ + |SELECT id, count(*) + |FROM t3 + |GROUP BY id + |LIMIT 1 + |""".stripMargin, skipCheckAnswer = true) + // The shuffle stage produces two rows and the limit operator should not been optimized out. + assert(findTopLevelLimit(plan).size == 1) + assert(findTopLevelLimit(adaptivePlan).size == 1) + } + } + } + test("SPARK-37063: OptimizeSkewInRebalancePartitions support optimize non-root node") { withTempView("v") { withSQLConf( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org