This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5ec13854620 [SPARK-43334][UI] Fix error while serializing ExecutorPeakMetricsDistributions into API response 5ec13854620 is described below commit 5ec138546205ba4248cc9ec72c3b7baf60f2fede Author: Thejdeep Gudivada <tgudiv...@linkedin.com> AuthorDate: Wed May 24 18:25:36 2023 -0500 [SPARK-43334][UI] Fix error while serializing ExecutorPeakMetricsDistributions into API response When we calculate the quantile information from the peak executor metrics values for the distribution, there is a possibility of running into an `ArrayIndexOutOfBounds` exception when the metric values are empty. This PR addresses that and fixes it by returning an empty array if the values are empty. ### Why are the changes needed? Without these changes, when the withDetails query parameter is used to query the stages REST API, we encounter a partial JSON response since the peak executor metrics distribution cannot be serialized due to the above index error. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a unit test to test this behavior Closes #41017 from thejdeep/SPARK-43334. Authored-by: Thejdeep Gudivada <tgudiv...@linkedin.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../main/scala/org/apache/spark/status/AppStatusStore.scala | 9 +-------- .../main/scala/org/apache/spark/status/AppStatusUtils.scala | 12 ++++++++++++ core/src/main/scala/org/apache/spark/status/api/v1/api.scala | 7 +++---- .../scala/org/apache/spark/status/AppStatusUtilsSuite.scala | 11 +++++++++++ 4 files changed, 27 insertions(+), 12 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala index d02d4b2507a..eaa7b7b9873 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala @@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap import org.apache.spark.{JobExecutionStatus, SparkConf, SparkContext} import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_UI_LOCAL_STORE_DIR +import org.apache.spark.status.AppStatusUtils.getQuantilesValue import org.apache.spark.status.api.v1 import org.apache.spark.storage.FallbackStorage.FALLBACK_BLOCK_MANAGER_ID import org.apache.spark.ui.scope._ @@ -770,14 +771,6 @@ private[spark] class AppStatusStore( } } - def getQuantilesValue( - values: IndexedSeq[Double], - quantiles: Array[Double]): IndexedSeq[Double] = { - val count = values.size - val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } - indices.map(i => values(i.toInt)).toIndexedSeq - } - def rdd(rddId: Int): v1.RDDStorageInfo = { store.read(classOf[RDDStorageInfoWrapper], rddId).info } diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala index 87f434daf48..04918ccbd57 100644 --- a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala +++ b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala @@ -72,4 +72,16 @@ private[spark] object AppStatusUtils { -1 } } + + def getQuantilesValue( + values: IndexedSeq[Double], + quantiles: Array[Double]): IndexedSeq[Double] = { + val count = values.size + if (count > 0) { + val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } + indices.map(i => values(i.toInt)).toIndexedSeq + } else { + IndexedSeq.fill(quantiles.length)(0.0) + } + } } diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index e272cf04dc7..f436d16ca47 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -31,6 +31,7 @@ import org.apache.spark.JobExecutionStatus import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest} +import org.apache.spark.status.AppStatusUtils.getQuantilesValue case class ApplicationInfo private[spark]( id: String, @@ -454,13 +455,11 @@ class ExecutorMetricsDistributions private[spark]( class ExecutorPeakMetricsDistributions private[spark]( val quantiles: IndexedSeq[Double], val executorMetrics: IndexedSeq[ExecutorMetrics]) { - private lazy val count = executorMetrics.length - private lazy val indices = quantiles.map { q => math.min((q * count).toLong, count - 1) } /** Returns the distributions for the specified metric. */ def getMetricDistribution(metricName: String): IndexedSeq[Double] = { - val sorted = executorMetrics.map(_.getMetricValue(metricName)).sorted - indices.map(i => sorted(i.toInt).toDouble) + val sorted = executorMetrics.map(_.getMetricValue(metricName).toDouble).sorted + getQuantilesValue(sorted, quantiles.toArray) } } diff --git a/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala index da14dcd5416..9c1a280d2b6 100644 --- a/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.status import java.util.Date import org.apache.spark.SparkFunSuite +import org.apache.spark.status.AppStatusUtils.getQuantilesValue import org.apache.spark.status.api.v1.{TaskData, TaskMetrics} class AppStatusUtilsSuite extends SparkFunSuite { @@ -94,4 +95,14 @@ class AppStatusUtilsSuite extends SparkFunSuite { gettingResultTime = 0L) assert(AppStatusUtils.schedulerDelay(finishedTask) === 3L) } + + test("getQuantilesValue") { + val values = IndexedSeq(1.0, 2.0, 3.0, 4.0) + val quantiles = Array(0.0, 0.25, 0.5, 0.75, 1.0) + assert(getQuantilesValue(values, quantiles) == IndexedSeq(1.0, 2.0, 3.0, 4.0, 4.0)) + + // When values are empty + val emptyValue = IndexedSeq() + assert(getQuantilesValue(emptyValue, quantiles) == IndexedSeq(0.0, 0.0, 0.0, 0.0, 0.0)) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org