[ 
https://issues.apache.org/jira/browse/SPARK-26193?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16718368#comment-16718368
 ] 

ASF GitHub Bot commented on SPARK-26193:
----------------------------------------

asfgit closed pull request #23286: [SPARK-26193][SQL][Follow Up] Read metrics 
rename and display text changes
URL: https://github.com/apache/spark/pull/23286
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
index 2a8d1dd995e27..35664ff515d4b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapTask.scala
@@ -92,7 +92,7 @@ private[spark] class ShuffleMapTask(
       threadMXBean.getCurrentThreadCpuTime - deserializeStartCpuTime
     } else 0L
 
-    dep.shuffleWriterProcessor.writeProcess(rdd, dep, partitionId, context, 
partition)
+    dep.shuffleWriterProcessor.write(rdd, dep, partitionId, context, partition)
   }
 
   override def preferredLocations: Seq[TaskLocation] = preferredLocs
diff --git 
a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala 
b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
index f5213157a9a85..5b0c7e9f2b0b4 100644
--- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
+++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleWriteProcessor.scala
@@ -41,7 +41,7 @@ private[spark] class ShuffleWriteProcessor extends 
Serializable with Logging {
    * get from [[ShuffleManager]] and triggers rdd compute, finally return the 
[[MapStatus]] for
    * this task.
    */
-  def writeProcess(
+  def write(
       rdd: RDD[_],
       dep: ShuffleDependency[_, _, _],
       partitionId: Int,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
index 9b05faaed0459..079ff25fcb67e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala
@@ -22,7 +22,7 @@ import java.util.Arrays
 import org.apache.spark._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLMetric, 
SQLShuffleReadMetricsReporter}
 
 /**
  * The [[Partition]] used by [[ShuffledRowRDD]]. A post-shuffle partition
@@ -157,9 +157,9 @@ class ShuffledRowRDD(
   override def compute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
     val shuffledRowPartition = split.asInstanceOf[ShuffledRowRDDPartition]
     val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics()
-    // `SQLShuffleMetricsReporter` will update its own metrics for SQL 
exchange operator,
+    // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL 
exchange operator,
     // as well as the `tempMetrics` for basic shuffle metrics.
-    val sqlMetricsReporter = new SQLShuffleMetricsReporter(tempMetrics, 
metrics)
+    val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, 
metrics)
     // The range of pre-shuffle partitions that we are fetching at here is
     // [startPreShufflePartitionIndex, endPreShufflePartitionIndex - 1].
     val reader =
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
index 0c2020572e721..da7b0c6f43fbc 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala
@@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, 
BoundReference, Uns
 import 
org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.execution._
-import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, 
SQLShuffleMetricsReporter, SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, 
SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.MutablePair
@@ -50,7 +50,7 @@ case class ShuffleExchangeExec(
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
   private lazy val readMetrics =
-    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
   override lazy val metrics = Map(
     "dataSize" -> SQLMetrics.createSizeMetric(sparkContext, "data size")
   ) ++ readMetrics ++ writeMetrics
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
index 1f2fdde538645..bfaf080292bce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/limit.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, CodeGe
 import org.apache.spark.sql.catalyst.plans.physical._
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
-import org.apache.spark.sql.execution.metric.{SQLShuffleMetricsReporter, 
SQLShuffleWriteMetricsReporter}
+import org.apache.spark.sql.execution.metric.{SQLShuffleReadMetricsReporter, 
SQLShuffleWriteMetricsReporter}
 
 /**
  * Take the first `limit` elements and collect them to a single partition.
@@ -41,7 +41,7 @@ case class CollectLimitExec(limit: Int, child: SparkPlan) 
extends UnaryExecNode
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
   private lazy val readMetrics =
-    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
   override lazy val metrics = readMetrics ++ writeMetrics
   protected override def doExecute(): RDD[InternalRow] = {
     val locallyLimited = child.execute().mapPartitionsInternal(_.take(limit))
@@ -165,7 +165,7 @@ case class TakeOrderedAndProjectExec(
   private lazy val writeMetrics =
     SQLShuffleWriteMetricsReporter.createShuffleWriteMetrics(sparkContext)
   private lazy val readMetrics =
-    SQLShuffleMetricsReporter.createShuffleReadMetrics(sparkContext)
+    SQLShuffleReadMetricsReporter.createShuffleReadMetrics(sparkContext)
   override lazy val metrics = readMetrics ++ writeMetrics
 
   protected override def doExecute(): RDD[InternalRow] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
index ff7941e3b3e8d..2c0ea80495abb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLShuffleMetricsReporter.scala
@@ -27,23 +27,23 @@ import org.apache.spark.shuffle.ShuffleWriteMetricsReporter
  * @param metrics All metrics in current SparkPlan. This param should not 
empty and
  *   contains all shuffle metrics defined in createShuffleReadMetrics.
  */
-private[spark] class SQLShuffleMetricsReporter(
+class SQLShuffleReadMetricsReporter(
     tempMetrics: TempShuffleReadMetrics,
     metrics: Map[String, SQLMetric]) extends TempShuffleReadMetrics {
   private[this] val _remoteBlocksFetched =
-    metrics(SQLShuffleMetricsReporter.REMOTE_BLOCKS_FETCHED)
+    metrics(SQLShuffleReadMetricsReporter.REMOTE_BLOCKS_FETCHED)
   private[this] val _localBlocksFetched =
-    metrics(SQLShuffleMetricsReporter.LOCAL_BLOCKS_FETCHED)
+    metrics(SQLShuffleReadMetricsReporter.LOCAL_BLOCKS_FETCHED)
   private[this] val _remoteBytesRead =
-    metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ)
+    metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ)
   private[this] val _remoteBytesReadToDisk =
-    metrics(SQLShuffleMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
+    metrics(SQLShuffleReadMetricsReporter.REMOTE_BYTES_READ_TO_DISK)
   private[this] val _localBytesRead =
-    metrics(SQLShuffleMetricsReporter.LOCAL_BYTES_READ)
+    metrics(SQLShuffleReadMetricsReporter.LOCAL_BYTES_READ)
   private[this] val _fetchWaitTime =
-    metrics(SQLShuffleMetricsReporter.FETCH_WAIT_TIME)
+    metrics(SQLShuffleReadMetricsReporter.FETCH_WAIT_TIME)
   private[this] val _recordsRead =
-    metrics(SQLShuffleMetricsReporter.RECORDS_READ)
+    metrics(SQLShuffleReadMetricsReporter.RECORDS_READ)
 
   override def incRemoteBlocksFetched(v: Long): Unit = {
     _remoteBlocksFetched.add(v)
@@ -75,7 +75,7 @@ private[spark] class SQLShuffleMetricsReporter(
   }
 }
 
-private[spark] object SQLShuffleMetricsReporter {
+object SQLShuffleReadMetricsReporter {
   val REMOTE_BLOCKS_FETCHED = "remoteBlocksFetched"
   val LOCAL_BLOCKS_FETCHED = "localBlocksFetched"
   val REMOTE_BYTES_READ = "remoteBytesRead"
@@ -88,8 +88,8 @@ private[spark] object SQLShuffleMetricsReporter {
    * Create all shuffle read relative metrics and return the Map.
    */
   def createShuffleReadMetrics(sc: SparkContext): Map[String, SQLMetric] = Map(
-    REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks 
fetched"),
-    LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks 
fetched"),
+    REMOTE_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "remote blocks read"),
+    LOCAL_BLOCKS_FETCHED -> SQLMetrics.createMetric(sc, "local blocks read"),
     REMOTE_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "remote bytes read"),
     REMOTE_BYTES_READ_TO_DISK -> SQLMetrics.createSizeMetric(sc, "remote bytes 
read to disk"),
     LOCAL_BYTES_READ -> SQLMetrics.createSizeMetric(sc, "local bytes read"),
@@ -102,7 +102,7 @@ private[spark] object SQLShuffleMetricsReporter {
  * @param metricsReporter Other reporter need to be updated in this 
SQLShuffleWriteMetricsReporter.
  * @param metrics Shuffle write metrics in current SparkPlan.
  */
-private[spark] class SQLShuffleWriteMetricsReporter(
+class SQLShuffleWriteMetricsReporter(
     metricsReporter: ShuffleWriteMetricsReporter,
     metrics: Map[String, SQLMetric]) extends ShuffleWriteMetricsReporter {
   private[this] val _bytesWritten =
@@ -112,29 +112,29 @@ private[spark] class SQLShuffleWriteMetricsReporter(
   private[this] val _writeTime =
     metrics(SQLShuffleWriteMetricsReporter.SHUFFLE_WRITE_TIME)
 
-  override private[spark] def incBytesWritten(v: Long): Unit = {
+  override def incBytesWritten(v: Long): Unit = {
     metricsReporter.incBytesWritten(v)
     _bytesWritten.add(v)
   }
-  override private[spark] def decRecordsWritten(v: Long): Unit = {
+  override def decRecordsWritten(v: Long): Unit = {
     metricsReporter.decBytesWritten(v)
     _recordsWritten.set(_recordsWritten.value - v)
   }
-  override private[spark] def incRecordsWritten(v: Long): Unit = {
+  override def incRecordsWritten(v: Long): Unit = {
     metricsReporter.incRecordsWritten(v)
     _recordsWritten.add(v)
   }
-  override private[spark] def incWriteTime(v: Long): Unit = {
+  override def incWriteTime(v: Long): Unit = {
     metricsReporter.incWriteTime(v)
     _writeTime.add(v)
   }
-  override private[spark] def decBytesWritten(v: Long): Unit = {
+  override def decBytesWritten(v: Long): Unit = {
     metricsReporter.decBytesWritten(v)
     _bytesWritten.set(_bytesWritten.value - v)
   }
 }
 
-private[spark] object SQLShuffleWriteMetricsReporter {
+object SQLShuffleWriteMetricsReporter {
   val SHUFFLE_BYTES_WRITTEN = "shuffleBytesWritten"
   val SHUFFLE_RECORDS_WRITTEN = "shuffleRecordsWritten"
   val SHUFFLE_WRITE_TIME = "shuffleWriteTime"
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
index 1ad5713ab8ae6..ca8692290edb2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeRowSerializerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.{LocalSparkSession, Row, SparkSession}
 import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
 import org.apache.spark.sql.catalyst.expressions.{UnsafeProjection, UnsafeRow}
-import org.apache.spark.sql.execution.metric.SQLShuffleMetricsReporter
+import org.apache.spark.sql.execution.metric.SQLShuffleReadMetricsReporter
 import org.apache.spark.sql.types._
 import org.apache.spark.storage.ShuffleBlockId
 import org.apache.spark.util.collection.ExternalSorter
@@ -140,7 +140,7 @@ class UnsafeRowSerializerSuite extends SparkFunSuite with 
LocalSparkSession {
         new UnsafeRowSerializer(2))
     val shuffled = new ShuffledRowRDD(
       dependency,
-      SQLShuffleMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
+      
SQLShuffleReadMetricsReporter.createShuffleReadMetrics(spark.sparkContext))
     shuffled.count()
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 4a80638f68858..afae914ff8a5b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -96,8 +96,8 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
     val shuffleExpected1 = Map(
       "records read" -> 2L,
-      "local blocks fetched" -> 2L,
-      "remote blocks fetched" -> 0L,
+      "local blocks read" -> 2L,
+      "remote blocks read" -> 0L,
       "shuffle records written" -> 2L)
     testSparkPlanMetrics(df, 1, Map(
       2L -> (("HashAggregate", expected1(0))),
@@ -114,8 +114,8 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
         "avg hash probe (min, med, max)" -> "\n(1, 1, 1)"))
     val shuffleExpected2 = Map(
       "records read" -> 4L,
-      "local blocks fetched" -> 4L,
-      "remote blocks fetched" -> 0L,
+      "local blocks read" -> 4L,
+      "remote blocks read" -> 0L,
       "shuffle records written" -> 4L)
     testSparkPlanMetrics(df2, 1, Map(
       2L -> (("HashAggregate", expected2(0))),
@@ -175,8 +175,8 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       1L -> (("Exchange", Map(
         "shuffle records written" -> 2L,
         "records read" -> 2L,
-        "local blocks fetched" -> 2L,
-        "remote blocks fetched" -> 0L))),
+        "local blocks read" -> 2L,
+        "remote blocks read" -> 0L))),
       0L -> (("ObjectHashAggregate", Map("number of output rows" -> 1L))))
     )
 
@@ -187,8 +187,8 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
       1L -> (("Exchange", Map(
         "shuffle records written" -> 4L,
         "records read" -> 4L,
-        "local blocks fetched" -> 4L,
-        "remote blocks fetched" -> 0L))),
+        "local blocks read" -> 4L,
+        "remote blocks read" -> 0L))),
       0L -> (("ObjectHashAggregate", Map("number of output rows" -> 3L))))
     )
   }
@@ -216,8 +216,8 @@ class SQLMetricsSuite extends SparkFunSuite with 
SQLMetricsTestUtils with Shared
           "number of output rows" -> 4L))),
         2L -> (("Exchange", Map(
           "records read" -> 4L,
-          "local blocks fetched" -> 2L,
-          "remote blocks fetched" -> 0L,
+          "local blocks read" -> 2L,
+          "remote blocks read" -> 0L,
           "shuffle records written" -> 2L))))
       )
     }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Implement shuffle write metrics in SQL
> --------------------------------------
>
>                 Key: SPARK-26193
>                 URL: https://issues.apache.org/jira/browse/SPARK-26193
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 3.0.0
>            Reporter: Xiao Li
>            Assignee: Yuanjian Li
>            Priority: Major
>             Fix For: 3.0.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to