This is an automated email from the ASF dual-hosted git repository. jiangxb1987 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new db7b865 [SPARK-31253][SQL][FOLLOW-UP] simplify the code of calculating size metrics of AQE shuffle db7b865 is described below commit db7b8651a19d5a749a9f0b4e8fb517e6994921c2 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Fri Apr 17 13:20:34 2020 -0700 [SPARK-31253][SQL][FOLLOW-UP] simplify the code of calculating size metrics of AQE shuffle ### What changes were proposed in this pull request? A followup of https://github.com/apache/spark/pull/28175: 1. use mutable collection to store the driver metrics 2. don't send size metrics if there is no map stats, as UI will display size as 0 if there is no data 3. calculate partition data size separately, to make the code easier to read. ### Why are the changes needed? code simplification ### Does this PR introduce any user-facing change? no ### How was this patch tested? existing tests Closes #28240 from cloud-fan/refactor. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Xingbo Jiang <xingbo.ji...@databricks.com> --- .../adaptive/CustomShuffleReaderExec.scala | 50 ++++++++++------------ 1 file changed, 22 insertions(+), 28 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala index 68f20bc..6450d49 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/CustomShuffleReaderExec.scala @@ -97,14 +97,27 @@ case class CustomShuffleReaderExec private( case _ => None } + @transient private lazy val partitionDataSizes: Option[Seq[Long]] = { + if (!isLocalReader && shuffleStage.get.mapStats.isDefined) { + val bytesByPartitionId = shuffleStage.get.mapStats.get.bytesByPartitionId + Some(partitionSpecs.map { + case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => + startReducerIndex.until(endReducerIndex).map(bytesByPartitionId).sum + case p: PartialReducerPartitionSpec => p.dataSize + case p => throw new IllegalStateException("unexpected " + p) + }) + } else { + None + } + } + private def sendDriverMetrics(): Unit = { val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - var driverAccumUpdates: Seq[(Long, Long)] = Seq.empty + val driverAccumUpdates = ArrayBuffer.empty[(Long, Long)] val numPartitionsMetric = metrics("numPartitions") numPartitionsMetric.set(partitionSpecs.length) - driverAccumUpdates = driverAccumUpdates :+ - (numPartitionsMetric.id, partitionSpecs.length.toLong) + driverAccumUpdates += (numPartitionsMetric.id -> partitionSpecs.length.toLong) if (hasSkewedPartition) { val skewedMetric = metrics("numSkewedPartitions") @@ -112,33 +125,14 @@ case class CustomShuffleReaderExec private( case p: PartialReducerPartitionSpec => p.reducerIndex }.distinct.length skewedMetric.set(numSkewedPartitions) - driverAccumUpdates = driverAccumUpdates :+ (skewedMetric.id, numSkewedPartitions.toLong) + driverAccumUpdates += (skewedMetric.id -> numSkewedPartitions.toLong) } - if(!isLocalReader) { - val partitionMetrics = metrics("partitionDataSize") - val mapStats = shuffleStage.get.mapStats - - if (mapStats.isEmpty) { - partitionMetrics.set(0) - driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, 0L) - } else { - var sum = 0L - partitionSpecs.foreach { - case CoalescedPartitionSpec(startReducerIndex, endReducerIndex) => - val dataSize = startReducerIndex.until(endReducerIndex).map( - mapStats.get.bytesByPartitionId(_)).sum - driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, dataSize) - sum += dataSize - case p: PartialReducerPartitionSpec => - driverAccumUpdates = driverAccumUpdates :+ (partitionMetrics.id, p.dataSize) - sum += p.dataSize - case p => throw new IllegalStateException("unexpected " + p) - } - - // Set sum value to "partitionDataSize" metric. - partitionMetrics.set(sum) - } + partitionDataSizes.foreach { dataSizes => + val partitionDataSizeMetrics = metrics("partitionDataSize") + driverAccumUpdates ++= dataSizes.map(partitionDataSizeMetrics.id -> _) + // Set sum value to "partitionDataSize" metric. + partitionDataSizeMetrics.set(dataSizes.sum) } SQLMetrics.postDriverMetricsUpdatedByValue(sparkContext, executionId, driverAccumUpdates) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org