This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ec13854620 [SPARK-43334][UI] Fix error while serializing 
ExecutorPeakMetricsDistributions into API response
5ec13854620 is described below

commit 5ec138546205ba4248cc9ec72c3b7baf60f2fede
Author: Thejdeep Gudivada <tgudiv...@linkedin.com>
AuthorDate: Wed May 24 18:25:36 2023 -0500

    [SPARK-43334][UI] Fix error while serializing 
ExecutorPeakMetricsDistributions into API response
    
    When we calculate the quantile information from the peak executor metrics 
values for the distribution, there is a possibility of running into an 
`ArrayIndexOutOfBounds` exception when the metric values are empty. This PR 
addresses that and fixes it by returning an empty array if the values are empty.
    
     ### Why are the changes needed?
     Without these changes, when the withDetails query parameter is used to 
query the stages REST API, we encounter a partial JSON response since the peak 
executor metrics distribution cannot be serialized due to the above index error.
    
     ### Does this PR introduce _any_ user-facing change?
     No
    
     ### How was this patch tested?
     Added a unit test to test this behavior
    
    Closes #41017 from thejdeep/SPARK-43334.
    
    Authored-by: Thejdeep Gudivada <tgudiv...@linkedin.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../main/scala/org/apache/spark/status/AppStatusStore.scala  |  9 +--------
 .../main/scala/org/apache/spark/status/AppStatusUtils.scala  | 12 ++++++++++++
 core/src/main/scala/org/apache/spark/status/api/v1/api.scala |  7 +++----
 .../scala/org/apache/spark/status/AppStatusUtilsSuite.scala  | 11 +++++++++++
 4 files changed, 27 insertions(+), 12 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index d02d4b2507a..eaa7b7b9873 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap
 import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR
+import org.apache.spark.status.AppStatusUtils.getQuantilesValue
 import org.apache.spark.status.api.v1
 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID
 import org.apache.spark.ui.scope._
@@ -770,14 +771,6 @@ private[spark] class AppStatusStore(
     }
   }
 
-  def getQuantilesValue(
-    values: IndexedSeq[Double],
-    quantiles: Array[Double]): IndexedSeq[Double] = {
-    val count = values.size
-    val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) 
}
-    indices.map(i => values(i.toInt)).toIndexedSeq
-  }
-
   def rdd(rddId: Int): v1.RDDStorageInfo = {
     store.read(classOf[RDDStorageInfoWrapper], rddId).info
   }
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
index 87f434daf48..04918ccbd57 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
@@ -72,4 +72,16 @@ private[spark] object AppStatusUtils {
       -1
     }
   }
+
+  def getQuantilesValue(
+    values: IndexedSeq[Double],
+    quantiles: Array[Double]): IndexedSeq[Double] = {
+    val count = values.size
+    if (count > 0) {
+      val indices = quantiles.map { q => math.min((q * count).toLong, count - 
1) }
+      indices.map(i => values(i.toInt)).toIndexedSeq
+    } else {
+      IndexedSeq.fill(quantiles.length)(0.0)
+    }
+  }
 }
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index e272cf04dc7..f436d16ca47 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -31,6 +31,7 @@ import org.apache.spark.JobExecutionStatus
 import org.apache.spark.executor.ExecutorMetrics
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.resource.{ExecutorResourceRequest, 
ResourceInformation, TaskResourceRequest}
+import org.apache.spark.status.AppStatusUtils.getQuantilesValue
 
 case class ApplicationInfo private[spark](
     id: String,
@@ -454,13 +455,11 @@ class ExecutorMetricsDistributions private[spark](
 class ExecutorPeakMetricsDistributions private[spark](
   val quantiles: IndexedSeq[Double],
   val executorMetrics: IndexedSeq[ExecutorMetrics]) {
-  private lazy val count = executorMetrics.length
-  private lazy val indices = quantiles.map { q => math.min((q * count).toLong, 
count - 1) }
 
   /** Returns the distributions for the specified metric. */
   def getMetricDistribution(metricName: String): IndexedSeq[Double] = {
-    val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted
-    indices.map(i => sorted(i.toInt).toDouble)
+    val sorted = 
executorMetrics.map(_.getMetricValue(metricName).toDouble).sorted
+    getQuantilesValue(sorted, quantiles.toArray)
   }
 }
 
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala
index da14dcd5416..9c1a280d2b6 100644
--- a/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.status
 import java.util.Date
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.status.AppStatusUtils.getQuantilesValue
 import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
 
 class AppStatusUtilsSuite extends SparkFunSuite {
@@ -94,4 +95,14 @@ class AppStatusUtilsSuite extends SparkFunSuite {
       gettingResultTime = 0L)
     assert(AppStatusUtils.schedulerDelay(finishedTask) === 3L)
   }
+
+  test("getQuantilesValue") {
+    val values = IndexedSeq(1.0, 2.0, 3.0, 4.0)
+    val quantiles = Array(0.0, 0.25, 0.5, 0.75, 1.0)
+    assert(getQuantilesValue(values, quantiles) == IndexedSeq(1.0, 2.0, 3.0, 
4.0, 4.0))
+
+    // When values are empty
+    val emptyValue = IndexedSeq()
+    assert(getQuantilesValue(emptyValue, quantiles) == IndexedSeq(0.0, 0.0, 
0.0, 0.0, 0.0))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to