This is an automated email from the ASF dual-hosted git repository.

mridulm80 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0ad7677  [SPARK-38309][CORE] Fix SHS `shuffleTotalReads` and 
`shuffleTotalBlocks` percentile metrics
0ad7677 is described below

commit 0ad76777e76f60d1aea0eed0a2a7bff20c7567d3
Author: Rob Reeves <roree...@linkedin.com>
AuthorDate: Tue Mar 8 12:20:43 2022 -0600

    [SPARK-38309][CORE] Fix SHS `shuffleTotalReads` and `shuffleTotalBlocks` 
percentile metrics
    
    ### What changes were proposed in this pull request?
    
    #### Background
    In PR #26508 (SPARK-26260) the SHS stage metric percentiles were updated to 
only include successful tasks when using disk storage. It did this by making 
the values for each metric negative when the task is not in a successful state. 
This approach was chosen to avoid breaking changes to disk storage. See [this 
comment](https://github.com/apache/spark/pull/26508#issuecomment-554540314) for 
context.
    
    To get the percentiles, it reads the metric values, starting at 0, in 
ascending order. This filters out all tasks that are not successful because the 
values are less than 0. To get the percentile values it scales the percentiles 
to the list index of successful tasks. For example if there are 200 tasks and 
you want percentiles [0, 25, 50, 75, 100] the lookup indexes in the task 
collection are [0, 50, 100, 150, 199].
    
    #### Issue
    For metrics 1) shuffle total reads and 2) shuffle total blocks, PR #26508 
incorrectly makes the metric indices positive. This means tasks that are not 
successful are included in the percentile calculations. The percentile lookup 
index calculation is still based on the number of successful task so the wrong 
task metric is returned for a given percentile. This was not caught because the 
unit test only verified values for one metric, `executorRunTime`.
    
    #### Fix
    The index values for `SHUFFLE_TOTAL_READS` and `SHUFFLE_TOTAL_BLOCKS` 
should not convert back to positive metric values for tasks that are not 
successful. I believe this was done because the metrics values are summed from 
two other metrics. Using the raw values still creates the desired outcome. 
`negative + negative = negative` and `positive + positive = positive`. There is 
no case where one metric will be negative and one will be positive. I also 
verified that these two metrics are o [...]
    
    ### Why are the changes needed?
    This change is required so that the SHS stage percentile metrics for 
shuffle read bytes and shuffle total blocks are correct.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. The user will see the correct percentile values for the stage summary 
shuffle read bytes.
    
    ### How was this patch tested?
    I updated the unit test to verify the percentile values for every task 
metric. I also modified the unit test to have unique values for every metric. 
Previously the test had the same metrics for every field. This would not catch 
bugs like the wrong field being read by accident.
    
    I manually validated the fix in the UI.
    
    **BEFORE**
    
![image](https://user-images.githubusercontent.com/5604993/155433460-322078c5-1821-4f2e-8e53-8fc3902eb7fe.png)
    
    **AFTER**
    
![image](https://user-images.githubusercontent.com/5604993/155433491-25ce3acf-290b-4b83-a0a9-0f9b71c7af04.png)
    
    I manually validated the fix in the task summary API 
(`/api/v1/applications/application_123/1/stages/14/0/taskSummary\?quantiles\=0,0.25,0.5,0.75,1.0`).
 See `shuffleReadMetrics.readBytes` and `shuffleReadMetrics.totalBlocksFetched`.
    
    Before:
    ```json
    {
       "quantiles":[
          0.0,
          0.25,
          0.5,
          0.75,
          1.0
       ],
       "shuffleReadMetrics":{
          "readBytes":[
             -2.0,
             -2.0,
             -2.0,
             -2.0,
             5.63718681E8
          ],
          "totalBlocksFetched":[
             -2.0,
             -2.0,
             -2.0,
             -2.0,
             2.0
          ],
          ...
       },
       ...
    }
    ```
    
    After:
    ```json
    {
       "quantiles":[
          0.0,
          0.25,
          0.5,
          0.75,
          1.0
       ],
       "shuffleReadMetrics":{
          "readBytes":[
             5.62865286E8,
             5.63779421E8,
             5.63941681E8,
             5.64327925E8,
             5.7674183E8
          ],
          "totalBlocksFetched":[
             2.0,
             2.0,
             2.0,
             2.0,
             2.0
          ],
          ...
       }
       ...
    }
    ```
    
    Closes #35637 from robreeves/SPARK-38309.
    
    Authored-by: Rob Reeves <roree...@linkedin.com>
    Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
---
 .../scala/org/apache/spark/status/storeTypes.scala |   4 +-
 .../apache/spark/status/AppStatusStoreSuite.scala  | 108 +++++++++++++++------
 2 files changed, 81 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala 
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 103e4ba..39bf593 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -344,7 +344,7 @@ private[spark] class TaskDataWrapper(
   @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_READS, parent = 
TaskIndexNames.STAGE)
   private def shuffleTotalReads: Long = {
     if (hasMetrics) {
-      getMetricValue(shuffleLocalBytesRead) + 
getMetricValue(shuffleRemoteBytesRead)
+      shuffleLocalBytesRead + shuffleRemoteBytesRead
     } else {
       -1L
     }
@@ -353,7 +353,7 @@ private[spark] class TaskDataWrapper(
   @JsonIgnore @KVIndex(value = TaskIndexNames.SHUFFLE_TOTAL_BLOCKS, parent = 
TaskIndexNames.STAGE)
   private def shuffleTotalBlocks: Long = {
     if (hasMetrics) {
-      getMetricValue(shuffleLocalBlocksFetched) + 
getMetricValue(shuffleRemoteBlocksFetched)
+      shuffleLocalBlocksFetched + shuffleRemoteBlocksFetched
     } else {
       -1L
     }
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
index 798cff8..53b0131 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusStoreSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.status
 
+import scala.util.Random
+
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.internal.config.History.{HYBRID_STORE_DISK_BACKEND, 
HybridStoreDiskBackend}
@@ -137,13 +139,52 @@ class AppStatusStoreSuite extends SparkFunSuite {
        * Task summary will consider (1, 3, 5) only
        */
       val summary = appStore.taskSummary(stageId, attemptId, uiQuantiles).get
+      val successfulTasks = Array(getTaskMetrics(1), getTaskMetrics(3), 
getTaskMetrics(5))
 
-      val values = Array(1.0, 3.0, 5.0)
+      def assertQuantiles(metricGetter: TaskMetrics => Double,
+        actualQuantiles: Seq[Double]): Unit = {
+        val values = successfulTasks.map(metricGetter)
+        val expectedQuantiles = new Distribution(values, 0, values.length)
+          .getQuantiles(uiQuantiles.sorted)
 
-      val dist = new Distribution(values, 0, 
values.length).getQuantiles(uiQuantiles.sorted)
-      dist.zip(summary.executorRunTime).foreach { case (expected, actual) =>
-        assert(expected === actual)
+        assert(actualQuantiles === expectedQuantiles)
       }
+
+      assertQuantiles(_.executorDeserializeTime, 
summary.executorDeserializeTime)
+      assertQuantiles(_.executorDeserializeCpuTime, 
summary.executorDeserializeCpuTime)
+      assertQuantiles(_.executorRunTime, summary.executorRunTime)
+      assertQuantiles(_.executorRunTime, summary.executorRunTime)
+      assertQuantiles(_.executorCpuTime, summary.executorCpuTime)
+      assertQuantiles(_.resultSize, summary.resultSize)
+      assertQuantiles(_.jvmGCTime, summary.jvmGcTime)
+      assertQuantiles(_.resultSerializationTime, 
summary.resultSerializationTime)
+      assertQuantiles(_.memoryBytesSpilled, summary.memoryBytesSpilled)
+      assertQuantiles(_.diskBytesSpilled, summary.diskBytesSpilled)
+      assertQuantiles(_.peakExecutionMemory, summary.peakExecutionMemory)
+      assertQuantiles(_.inputMetrics.bytesRead, summary.inputMetrics.bytesRead)
+      assertQuantiles(_.inputMetrics.recordsRead, 
summary.inputMetrics.recordsRead)
+      assertQuantiles(_.outputMetrics.bytesWritten, 
summary.outputMetrics.bytesWritten)
+      assertQuantiles(_.outputMetrics.recordsWritten, 
summary.outputMetrics.recordsWritten)
+      assertQuantiles(_.shuffleReadMetrics.remoteBlocksFetched,
+        summary.shuffleReadMetrics.remoteBlocksFetched)
+      assertQuantiles(_.shuffleReadMetrics.localBlocksFetched,
+        summary.shuffleReadMetrics.localBlocksFetched)
+      assertQuantiles(_.shuffleReadMetrics.fetchWaitTime, 
summary.shuffleReadMetrics.fetchWaitTime)
+      assertQuantiles(_.shuffleReadMetrics.remoteBytesRead,
+        summary.shuffleReadMetrics.remoteBytesRead)
+      assertQuantiles(_.shuffleReadMetrics.remoteBytesReadToDisk,
+        summary.shuffleReadMetrics.remoteBytesReadToDisk)
+      assertQuantiles(
+        t => t.shuffleReadMetrics.localBytesRead + 
t.shuffleReadMetrics.remoteBytesRead,
+        summary.shuffleReadMetrics.readBytes)
+      assertQuantiles(
+        t => t.shuffleReadMetrics.localBlocksFetched + 
t.shuffleReadMetrics.remoteBlocksFetched,
+        summary.shuffleReadMetrics.totalBlocksFetched)
+      assertQuantiles(_.shuffleWriteMetrics.bytesWritten, 
summary.shuffleWriteMetrics.writeBytes)
+      assertQuantiles(_.shuffleWriteMetrics.writeTime, 
summary.shuffleWriteMetrics.writeTime)
+      assertQuantiles(_.shuffleWriteMetrics.recordsWritten,
+        summary.shuffleWriteMetrics.writeRecords)
+
       appStore.close()
     }
   }
@@ -227,32 +268,41 @@ class AppStatusStoreSuite extends SparkFunSuite {
     liveTask.write(store.asInstanceOf[ElementTrackingStore], 1L)
   }
 
-  private def getTaskMetrics(i: Int): TaskMetrics = {
+  /**
+   * Creates fake task metrics
+   * @param seed The random seed. The output will be reproducible for a given 
seed.
+   * @return The test metrics object with fake data
+   */
+  private def getTaskMetrics(seed: Int): TaskMetrics = {
+    val random = new Random(seed)
+    val randomMax = 1000
+    def nextInt(): Int = random.nextInt(randomMax)
+
     val taskMetrics = new TaskMetrics()
-    taskMetrics.setExecutorDeserializeTime(i)
-    taskMetrics.setExecutorDeserializeCpuTime(i)
-    taskMetrics.setExecutorRunTime(i)
-    taskMetrics.setExecutorCpuTime(i)
-    taskMetrics.setResultSize(i)
-    taskMetrics.setJvmGCTime(i)
-    taskMetrics.setResultSerializationTime(i)
-    taskMetrics.incMemoryBytesSpilled(i)
-    taskMetrics.incDiskBytesSpilled(i)
-    taskMetrics.incPeakExecutionMemory(i)
-    taskMetrics.inputMetrics.incBytesRead(i)
-    taskMetrics.inputMetrics.incRecordsRead(i)
-    taskMetrics.outputMetrics.setBytesWritten(i)
-    taskMetrics.outputMetrics.setRecordsWritten(i)
-    taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(i)
-    taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(i)
-    taskMetrics.shuffleReadMetrics.incFetchWaitTime(i)
-    taskMetrics.shuffleReadMetrics.incRemoteBytesRead(i)
-    taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(i)
-    taskMetrics.shuffleReadMetrics.incLocalBytesRead(i)
-    taskMetrics.shuffleReadMetrics.incRecordsRead(i)
-    taskMetrics.shuffleWriteMetrics.incBytesWritten(i)
-    taskMetrics.shuffleWriteMetrics.incWriteTime(i)
-    taskMetrics.shuffleWriteMetrics.incRecordsWritten(i)
+    taskMetrics.setExecutorDeserializeTime(nextInt())
+    taskMetrics.setExecutorDeserializeCpuTime(nextInt())
+    taskMetrics.setExecutorRunTime(nextInt())
+    taskMetrics.setExecutorCpuTime(nextInt())
+    taskMetrics.setResultSize(nextInt())
+    taskMetrics.setJvmGCTime(nextInt())
+    taskMetrics.setResultSerializationTime(nextInt())
+    taskMetrics.incMemoryBytesSpilled(nextInt())
+    taskMetrics.incDiskBytesSpilled(nextInt())
+    taskMetrics.incPeakExecutionMemory(nextInt())
+    taskMetrics.inputMetrics.incBytesRead(nextInt())
+    taskMetrics.inputMetrics.incRecordsRead(nextInt())
+    taskMetrics.outputMetrics.setBytesWritten(nextInt())
+    taskMetrics.outputMetrics.setRecordsWritten(nextInt())
+    taskMetrics.shuffleReadMetrics.incRemoteBlocksFetched(nextInt())
+    taskMetrics.shuffleReadMetrics.incLocalBlocksFetched(nextInt())
+    taskMetrics.shuffleReadMetrics.incFetchWaitTime(nextInt())
+    taskMetrics.shuffleReadMetrics.incRemoteBytesRead(nextInt())
+    taskMetrics.shuffleReadMetrics.incRemoteBytesReadToDisk(nextInt())
+    taskMetrics.shuffleReadMetrics.incLocalBytesRead(nextInt())
+    taskMetrics.shuffleReadMetrics.incRecordsRead(nextInt())
+    taskMetrics.shuffleWriteMetrics.incBytesWritten(nextInt())
+    taskMetrics.shuffleWriteMetrics.incWriteTime(nextInt())
+    taskMetrics.shuffleWriteMetrics.incRecordsWritten(nextInt())
     taskMetrics
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to