This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new dddfeca  [SPARK-30209][SQL][WEB-UI] Display stageId, attemptId and 
taskId for max metrics in Spark UI
dddfeca is described below

commit dddfeca175bdce5294debe00d4a993daef92ca60
Author: Niranjan Artal <nar...@nvidia.com>
AuthorDate: Mon Dec 16 15:27:34 2019 -0600

    [SPARK-30209][SQL][WEB-UI] Display stageId, attemptId and taskId for max 
metrics in Spark UI
    
    ### What changes were proposed in this pull request?
    
    SPARK-30209 discusses about adding additional metrics such as stageId, 
attempId and taskId for max metrics. We have the data required to display in 
LiveStageMetrics. Need to capture and pass these metrics to display on the UI. 
To minimize memory used for variables, we are saving maximum of each metric id 
per stage. So per stage additional memory usage is (#metrics * 4 * 
sizeof(Long)).
    Then max is calculated for each metric id among all stages which is passed 
in the stringValue method. Memory used is minimal. Ran the benchmark for 
runtime. Stage.Proc time has increased to around 1.5-2.5x but the Aggregate 
time has decreased.
    
    ### Why are the changes needed?
    
    These additional metrics stageId, attemptId and taskId could help in 
debugging the jobs quicker.  For a  given operator, it will be easy to identify 
the task which is taking maximum time to complete from the SQL tab itself.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. stageId, attemptId and taskId is shown only for executor side metrics. 
For driver metrics, "(driver)" is displayed on UI.
    ![image 
(3)](https://user-images.githubusercontent.com/50492963/70763041-929d9980-1d07-11ea-940f-88ac6bdce9b5.png)
    
    "Driver"
    ![image 
(4)](https://user-images.githubusercontent.com/50492963/70763043-94675d00-1d07-11ea-95ab-3478728cb435.png)
    
    ### How was this patch tested?
    
    Manually tested, ran benchmark script for runtime.
    
    Closes #26843 from nartal1/SPARK-30209.
    
    Authored-by: Niranjan Artal <nar...@nvidia.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/sql/execution/metric/SQLMetrics.scala    | 52 ++++++++++++------
 .../sql/execution/ui/SQLAppStatusListener.scala    | 63 +++++++++++++++++-----
 .../sql/execution/metric/SQLMetricsSuite.scala     | 39 +++++++++-----
 .../sql/execution/metric/SQLMetricsTestUtils.scala | 26 ++++++---
 .../execution/ui/SQLAppStatusListenerSuite.scala   |  5 +-
 5 files changed, 137 insertions(+), 48 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
index b7f0ab2..45b1c86 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/metric/SQLMetrics.scala
@@ -111,7 +111,8 @@ object SQLMetrics {
     // data size total (min, med, max):
     // 100GB (100MB, 1GB, 10GB)
     val acc = new SQLMetric(SIZE_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = false)
+    acc.register(sc, name = Some(s"$name total (min, med, max (stageId 
(attemptId): taskId))"),
+      countFailedValues = false)
     acc
   }
 
@@ -120,14 +121,16 @@ object SQLMetrics {
     // duration(min, med, max):
     // 5s (800ms, 1s, 2s)
     val acc = new SQLMetric(TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = false)
+    acc.register(sc, name = Some(s"$name total (min, med, max (stageId 
(attemptId): taskId))"),
+      countFailedValues = false)
     acc
   }
 
   def createNanoTimingMetric(sc: SparkContext, name: String): SQLMetric = {
     // Same with createTimingMetric, just normalize the unit of time to 
millisecond.
     val acc = new SQLMetric(NS_TIMING_METRIC, -1)
-    acc.register(sc, name = Some(s"$name total (min, med, max)"), 
countFailedValues = false)
+    acc.register(sc, name = Some(s"$name total (min, med, max (stageId 
(attemptId): taskId))"),
+      countFailedValues = false)
     acc
   }
 
@@ -142,31 +145,46 @@ object SQLMetrics {
     // probe avg (min, med, max):
     // (1.2, 2.2, 6.3)
     val acc = new SQLMetric(AVERAGE_METRIC)
-    acc.register(sc, name = Some(s"$name (min, med, max)"), countFailedValues 
= false)
+    acc.register(sc, name = Some(s"$name (min, med, max (stageId (attemptId): 
taskId))"),
+      countFailedValues = false)
     acc
   }
 
+  private def toNumberFormat(value: Long): String = {
+    val numberFormat = NumberFormat.getNumberInstance(Locale.US)
+    numberFormat.format(value.toDouble / baseForAvgMetric)
+  }
+
+  def metricNeedsMax(metricsType: String): Boolean = {
+    metricsType != SUM_METRIC
+  }
+
   /**
    * A function that defines how we aggregate the final accumulator results 
among all tasks,
    * and represent it in string for a SQL physical operator.
-   */
-  def stringValue(metricsType: String, values: Array[Long]): String = {
+    */
+  def stringValue(metricsType: String, values: Array[Long], maxMetrics: 
Array[Long]): String = {
+    // stringMetric = "(driver)" OR (stage $stageId (attempt $attemptId): task 
$taskId))
+    val stringMetric = if (maxMetrics.isEmpty) {
+      "(driver)"
+    } else {
+      s"(stage ${maxMetrics(1)} (attempt ${maxMetrics(2)}): task 
${maxMetrics(3)})"
+    }
     if (metricsType == SUM_METRIC) {
       val numberFormat = NumberFormat.getIntegerInstance(Locale.US)
       numberFormat.format(values.sum)
     } else if (metricsType == AVERAGE_METRIC) {
-      val numberFormat = NumberFormat.getNumberInstance(Locale.US)
-
       val validValues = values.filter(_ > 0)
       val Seq(min, med, max) = {
         val metric = if (validValues.isEmpty) {
-          Seq.fill(3)(0L)
+          val zeros = Seq.fill(3)(0L)
+          zeros.map(v => toNumberFormat(v))
         } else {
           Arrays.sort(validValues)
-          Seq(validValues(0), validValues(validValues.length / 2),
-            validValues(validValues.length - 1))
+          Seq(toNumberFormat(validValues(0)), 
toNumberFormat(validValues(validValues.length / 2)),
+            s"${toNumberFormat(validValues(validValues.length - 1))} 
$stringMetric")
         }
-        metric.map(v => numberFormat.format(v.toDouble / baseForAvgMetric))
+        metric
       }
       s"\n($min, $med, $max)"
     } else {
@@ -183,13 +201,15 @@ object SQLMetrics {
       val validValues = values.filter(_ >= 0)
       val Seq(sum, min, med, max) = {
         val metric = if (validValues.isEmpty) {
-          Seq.fill(4)(0L)
+          val zeros = Seq.fill(4)(0L)
+          zeros.map(v => strFormat(v))
         } else {
           Arrays.sort(validValues)
-          Seq(validValues.sum, validValues(0), validValues(validValues.length 
/ 2),
-            validValues(validValues.length - 1))
+          Seq(strFormat(validValues.sum), strFormat(validValues(0)),
+            strFormat(validValues(validValues.length / 2)),
+            s"${strFormat(validValues(validValues.length - 1))} $stringMetric")
         }
-        metric.map(strFormat)
+        metric
       }
       s"\n$sum ($min, $med, $max)"
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index e1100c3..64d2f33 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -103,12 +103,13 @@ class SQLAppStatusListener(
         }
       }.getOrElse(getOrCreateExecution(executionId))
 
-    // Record the accumulator IDs for the stages of this job, so that the code 
that keeps
-    // track of the metrics knows which accumulators to look at.
-    val accumIds = exec.metrics.map(_.accumulatorId).toSet
-    if (accumIds.nonEmpty) {
+    // Record the accumulator IDs and metric types for the stages of this job, 
so that the code
+    // that keeps track of the metrics knows which accumulators to look at.
+    val accumIdsAndType = exec.metrics.map { m => (m.accumulatorId, 
m.metricType) }.toMap
+    if (accumIdsAndType.nonEmpty) {
       event.stageInfos.foreach { stage =>
-        stageMetrics.put(stage.stageId, new LiveStageMetrics(0, 
stage.numTasks, accumIds))
+        stageMetrics.put(stage.stageId, new LiveStageMetrics(stage.stageId, 0,
+          stage.numTasks, accumIdsAndType))
       }
     }
 
@@ -126,7 +127,8 @@ class SQLAppStatusListener(
     Option(stageMetrics.get(event.stageInfo.stageId)).foreach { stage =>
       if (stage.attemptId != event.stageInfo.attemptNumber) {
         stageMetrics.put(event.stageInfo.stageId,
-          new LiveStageMetrics(event.stageInfo.attemptNumber, stage.numTasks, 
stage.accumulatorIds))
+          new LiveStageMetrics(event.stageInfo.stageId, 
event.stageInfo.attemptNumber,
+            stage.numTasks, stage.accumIdsToMetricType))
       }
     }
   }
@@ -198,12 +200,17 @@ class SQLAppStatusListener(
   private def aggregateMetrics(exec: LiveExecutionData): Map[Long, String] = {
     val metricTypes = exec.metrics.map { m => (m.accumulatorId, m.metricType) 
}.toMap
 
-    val taskMetrics = exec.stages.toSeq
+    val liveStageMetrics = exec.stages.toSeq
       .flatMap { stageId => Option(stageMetrics.get(stageId)) }
-      .flatMap(_.metricValues())
+
+    val taskMetrics = liveStageMetrics.flatMap(_.metricValues())
+
+    val maxMetrics = liveStageMetrics.flatMap(_.maxMetricValues())
 
     val allMetrics = new mutable.HashMap[Long, Array[Long]]()
 
+    val maxMetricsFromAllStages = new mutable.HashMap[Long, Array[Long]]()
+
     taskMetrics.foreach { case (id, values) =>
       val prev = allMetrics.getOrElse(id, null)
       val updated = if (prev != null) {
@@ -214,6 +221,18 @@ class SQLAppStatusListener(
       allMetrics(id) = updated
     }
 
+    // Find the max for each metric id between all stages.
+    maxMetrics.foreach { case (id, value, taskId, stageId, attemptId) =>
+      val updated = maxMetricsFromAllStages.getOrElse(id, Array(value, 
stageId, attemptId, taskId))
+      if (value > updated(0)) {
+        updated(0) = value
+        updated(1) = stageId
+        updated(2) = attemptId
+        updated(3) = taskId
+      }
+      maxMetricsFromAllStages(id) = updated
+    }
+
     exec.driverAccumUpdates.foreach { case (id, value) =>
       if (metricTypes.contains(id)) {
         val prev = allMetrics.getOrElse(id, null)
@@ -229,7 +248,8 @@ class SQLAppStatusListener(
     }
 
     val aggregatedMetrics = allMetrics.map { case (id, values) =>
-      id -> SQLMetrics.stringValue(metricTypes(id), values)
+      id -> SQLMetrics.stringValue(metricTypes(id), values, 
maxMetricsFromAllStages.getOrElse(id,
+        Array.empty[Long]))
     }.toMap
 
     // Check the execution again for whether the aggregated metrics data has 
been calculated.
@@ -440,9 +460,10 @@ private class LiveExecutionData(val executionId: Long) 
extends LiveEntity {
 }
 
 private class LiveStageMetrics(
+    val stageId: Int,
     val attemptId: Int,
     val numTasks: Int,
-    val accumulatorIds: Set[Long]) {
+    val accumIdsToMetricType: Map[Long, String]) {
 
   /**
    * Mapping of task IDs to their respective index. Note this may contain more 
elements than the
@@ -461,6 +482,8 @@ private class LiveStageMetrics(
    */
   private val taskMetrics = new ConcurrentHashMap[Long, Array[Long]]()
 
+  private val  metricsIdToMaxTaskValue = new ConcurrentHashMap[Long, 
Array[Long]]()
+
   def registerTask(taskId: Long, taskIdx: Int): Unit = {
     taskIndices.update(taskId, taskIdx)
   }
@@ -487,7 +510,7 @@ private class LiveStageMetrics(
     }
 
     accumUpdates
-      .filter { acc => acc.update.isDefined && accumulatorIds.contains(acc.id) 
}
+      .filter { acc => acc.update.isDefined && 
accumIdsToMetricType.contains(acc.id) }
       .foreach { acc =>
         // In a live application, accumulators have Long values, but when 
reading from event
         // logs, they have String values. For now, assume all accumulators are 
Long and convert
@@ -500,14 +523,30 @@ private class LiveStageMetrics(
 
         val metricValues = taskMetrics.computeIfAbsent(acc.id, _ => new 
Array(numTasks))
         metricValues(taskIdx) = value
-      }
 
+        if (SQLMetrics.metricNeedsMax(accumIdsToMetricType(acc.id))) {
+          val maxMetricsTaskId = 
metricsIdToMaxTaskValue.computeIfAbsent(acc.id, _ => Array(value,
+            taskId))
+
+          if (value > maxMetricsTaskId.head) {
+            maxMetricsTaskId(0) = value
+            maxMetricsTaskId(1) = taskId
+          }
+        }
+      }
     if (finished) {
       completedIndices += taskIdx
     }
   }
 
   def metricValues(): Seq[(Long, Array[Long])] = taskMetrics.asScala.toSeq
+
+  // Return Seq of metric id, value, taskId, stageId, attemptId for this stage
+  def maxMetricValues(): Seq[(Long, Long, Long, Int, Int)] = {
+    metricsIdToMaxTaskValue.asScala.toSeq.map { case (id, maxMetrics) => (id, 
maxMetrics(0),
+      maxMetrics(1), stageId, attemptId)
+    }
+  }
 }
 
 private object SQLAppStatusListener {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index fbf97e9..fcb089e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -84,8 +84,9 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val ds = spark.range(10).filter('id < 5)
     testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
       0L -> (("WholeStageCodegen (1)", Map(
-        "duration total (min, med, max)" -> 
{_.toString.matches(timingMetricPattern)})))
-    ), true)
+        "duration total (min, med, max (stageId (attemptId): taskId))" -> {
+          _.toString.matches(timingMetricPattern)
+        })))), true)
   }
 
   test("Aggregate metrics") {
@@ -95,9 +96,11 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val df = testData2.groupBy().count() // 2 partitions
     val expected1 = Seq(
       Map("number of output rows" -> 2L,
-        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"),
+        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+          aggregateMetricsPattern),
       Map("number of output rows" -> 1L,
-        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"))
+        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+          aggregateMetricsPattern))
     val shuffleExpected1 = Map(
       "records read" -> 2L,
       "local blocks read" -> 2L,
@@ -113,9 +116,12 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val df2 = testData2.groupBy('a).count()
     val expected2 = Seq(
       Map("number of output rows" -> 4L,
-        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"),
+        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+          aggregateMetricsPattern),
       Map("number of output rows" -> 3L,
-        "avg hash probe bucket list iters (min, med, max)" -> "\n(1, 1, 1)"))
+        "avg hash probe bucket list iters (min, med, max (stageId (attemptId): 
taskId))" ->
+          aggregateMetricsPattern))
+
     val shuffleExpected2 = Map(
       "records read" -> 4L,
       "local blocks read" -> 4L,
@@ -161,9 +167,12 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
       }
       val metrics = getSparkPlanMetrics(df, 1, nodeIds, enableWholeStage).get
       nodeIds.foreach { nodeId =>
-        val probes = metrics(nodeId)._2("avg hash probe bucket list iters 
(min, med, max)")
-        probes.toString.stripPrefix("\n(").stripSuffix(")").split(", 
").foreach { probe =>
-          assert(probe.toDouble > 1.0)
+        val probes = metrics(nodeId)._2("avg hash probe bucket list iters 
(min, med, max (stageId" +
+          " (attemptId): taskId))")
+        // Extract min, med, max from the string and strip off everthing else.
+        val index = 
probes.toString.stripPrefix("\n(").stripSuffix(")").indexOf(" (", 0)
+        probes.toString.stripPrefix("\n(").stripSuffix(")").slice(0, 
index).split(", ").foreach {
+          probe => assert(probe.toDouble > 1.0)
         }
       }
     }
@@ -208,9 +217,15 @@ class SQLMetricsSuite extends SharedSparkSession with 
SQLMetricsTestUtils {
     val df = Seq(1, 3, 2).toDF("id").sort('id)
     testSparkPlanMetricsWithPredicates(df, 2, Map(
       0L -> (("Sort", Map(
-        "sort time total (min, med, max)" -> 
{_.toString.matches(timingMetricPattern)},
-        "peak memory total (min, med, max)" -> 
{_.toString.matches(sizeMetricPattern)},
-        "spill size total (min, med, max)" -> 
{_.toString.matches(sizeMetricPattern)})))
+        "sort time total (min, med, max (stageId (attemptId): taskId))" -> {
+          _.toString.matches(timingMetricPattern)
+        },
+        "peak memory total (min, med, max (stageId (attemptId): taskId))" -> {
+          _.toString.matches(sizeMetricPattern)
+        },
+        "spill size total (min, med, max (stageId (attemptId): taskId))" -> {
+          _.toString.matches(sizeMetricPattern)
+        })))
     ))
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 57731e5..0c1148f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -41,16 +41,28 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
 
   protected def statusStore: SQLAppStatusStore = spark.sharedState.statusStore
 
-  // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 
32.1 MiB)"
+  // Pattern of size SQLMetric value, e.g. "\n96.2 MiB (32.1 MiB, 32.1 MiB, 
32.1 MiB (stage 0
+  // (attempt 0): task 4))" OR "\n96.2 MiB (32.1 MiB, 32.1 MiB, 32.1 MiB)"
   protected val sizeMetricPattern = {
     val bytes = "([0-9]+(\\.[0-9]+)?) (EiB|PiB|TiB|GiB|MiB|KiB|B)"
-    s"\\n$bytes \\($bytes, $bytes, $bytes\\)"
+    val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task 
([0-9])+\\)"
+    s"\\n$bytes \\($bytes, $bytes, $bytes( $maxMetrics)?\\)"
   }
 
-  // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 
ms)"
+  // Pattern of timing SQLMetric value, e.g. "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms 
(stage 3 (attempt
+  // 0): task 217))" OR "\n2.0 ms (1.0 ms, 1.0 ms, 1.0 ms)"
   protected val timingMetricPattern = {
     val duration = "([0-9]+(\\.[0-9]+)?) (ms|s|m|h)"
-    s"\\n$duration \\($duration, $duration, $duration\\)"
+    val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task 
([0-9])+\\)"
+    s"\\n$duration \\($duration, $duration, $duration( $maxMetrics)?\\)"
+  }
+
+  // Pattern of size SQLMetric value for Aggregate tests.
+  // e.g "\n(1, 1, 0.9 (stage 1 (attempt 0): task 8)) OR "\n(1, 1, 0.9 )"
+  protected val aggregateMetricsPattern = {
+    val iters = "([0-9]+(\\.[0-9]+)?)"
+    val maxMetrics = "\\(stage ([0-9])+ \\(attempt ([0-9])+\\)\\: task 
([0-9])+\\)"
+    s"\\n\\($iters, $iters, $iters( $maxMetrics)?\\)"
   }
 
   /**
@@ -86,7 +98,7 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
     }
 
     val totalNumBytesMetric = executedNode.metrics.find(
-      _.name == "written output total (min, med, max)").get
+      _.name == "written output total (min, med, max (stageId (attemptId): 
taskId))").get
     val totalNumBytes = 
metrics(totalNumBytesMetric.accumulatorId).replaceAll(",", "")
       .split(" ").head.trim.toDouble
     assert(totalNumBytes > 0)
@@ -205,7 +217,9 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
       expectedMetrics: Map[Long, (String, Map[String, Any])]): Unit = {
     val expectedMetricsPredicates = expectedMetrics.mapValues { case 
(nodeName, nodeMetrics) =>
       (nodeName, nodeMetrics.mapValues(expectedMetricValue =>
-        (actualMetricValue: Any) => expectedMetricValue.toString === 
actualMetricValue))
+        (actualMetricValue: Any) => {
+          actualMetricValue.toString.matches(expectedMetricValue.toString)
+        }))
     }
     testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, 
expectedMetricsPredicates)
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index a8b77b8..4113c2c 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -524,9 +524,10 @@ class SQLAppStatusListenerSuite extends SharedSparkSession 
with JsonTestUtils
     val metrics = statusStore.executionMetrics(execId)
     val driverMetric = physicalPlan.metrics("dummy")
     val driverMetric2 = physicalPlan.metrics("dummy2")
-    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType, 
Array(expectedAccumValue))
+    val expectedValue = SQLMetrics.stringValue(driverMetric.metricType,
+      Array(expectedAccumValue), Array.empty[Long])
     val expectedValue2 = SQLMetrics.stringValue(driverMetric2.metricType,
-      Array(expectedAccumValue2))
+      Array(expectedAccumValue2), Array.empty[Long])
 
     assert(metrics.contains(driverMetric.id))
     assert(metrics(driverMetric.id) === expectedValue)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to