Repository: spark
Updated Branches:
  refs/heads/master c2766b07b -> f3f1e14bb


[SPARK-23326][WEBUI] schedulerDelay should return 0 when the task is running

## What changes were proposed in this pull request?

When a task is still running, metrics like executorRunTime are not available. 
Then `schedulerDelay` will be almost the same as `duration` and that's 
confusing.

This PR makes `schedulerDelay` return 0 when the task is running which is the 
same behavior as 2.2.

## How was this patch tested?

`AppStatusUtilsSuite.schedulerDelay`

Author: Shixiong Zhu <zsxw...@gmail.com>

Closes #20493 from zsxwing/SPARK-23326.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f3f1e14b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f3f1e14b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f3f1e14b

Branch: refs/heads/master
Commit: f3f1e14bb73dfdd2927d95b12d7d61d22de8a0ac
Parents: c2766b0
Author: Shixiong Zhu <zsxw...@gmail.com>
Authored: Tue Feb 6 14:42:42 2018 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Tue Feb 6 14:42:42 2018 +0800

----------------------------------------------------------------------
 .../apache/spark/status/AppStatusUtils.scala    | 11 ++-
 .../spark/status/AppStatusUtilsSuite.scala      | 89 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f3f1e14b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
index 341bd4e..87f434d 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusUtils.scala
@@ -17,16 +17,23 @@
 
 package org.apache.spark.status
 
-import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
+import org.apache.spark.status.api.v1.TaskData
 
 private[spark] object AppStatusUtils {
 
+  private val TASK_FINISHED_STATES = Set("FAILED", "KILLED", "SUCCESS")
+
+  private def isTaskFinished(task: TaskData): Boolean = {
+    TASK_FINISHED_STATES.contains(task.status)
+  }
+
   def schedulerDelay(task: TaskData): Long = {
-    if (task.taskMetrics.isDefined && task.duration.isDefined) {
+    if (isTaskFinished(task) && task.taskMetrics.isDefined && 
task.duration.isDefined) {
       val m = task.taskMetrics.get
       schedulerDelay(task.launchTime.getTime(), fetchStart(task), 
task.duration.get,
         m.executorDeserializeTime, m.resultSerializationTime, 
m.executorRunTime)
     } else {
+      // The task is still running and the metrics like executorRunTime are 
not available.
       0L
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f3f1e14b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala 
b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala
new file mode 100644
index 0000000..9e74e86
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/AppStatusUtilsSuite.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status
+
+import java.util.Date
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.status.api.v1.{TaskData, TaskMetrics}
+
+class AppStatusUtilsSuite extends SparkFunSuite {
+
+  test("schedulerDelay") {
+    val runningTask = new TaskData(
+      taskId = 0,
+      index = 0,
+      attempt = 0,
+      launchTime = new Date(1L),
+      resultFetchStart = None,
+      duration = Some(100L),
+      executorId = "1",
+      host = "localhost",
+      status = "RUNNING",
+      taskLocality = "PROCESS_LOCAL",
+      speculative = false,
+      accumulatorUpdates = Nil,
+      errorMessage = None,
+      taskMetrics = Some(new TaskMetrics(
+        executorDeserializeTime = 0L,
+        executorDeserializeCpuTime = 0L,
+        executorRunTime = 0L,
+        executorCpuTime = 0L,
+        resultSize = 0L,
+        jvmGcTime = 0L,
+        resultSerializationTime = 0L,
+        memoryBytesSpilled = 0L,
+        diskBytesSpilled = 0L,
+        peakExecutionMemory = 0L,
+        inputMetrics = null,
+        outputMetrics = null,
+        shuffleReadMetrics = null,
+        shuffleWriteMetrics = null)))
+    assert(AppStatusUtils.schedulerDelay(runningTask) === 0L)
+
+    val finishedTask = new TaskData(
+      taskId = 0,
+      index = 0,
+      attempt = 0,
+      launchTime = new Date(1L),
+      resultFetchStart = None,
+      duration = Some(100L),
+      executorId = "1",
+      host = "localhost",
+      status = "SUCCESS",
+      taskLocality = "PROCESS_LOCAL",
+      speculative = false,
+      accumulatorUpdates = Nil,
+      errorMessage = None,
+      taskMetrics = Some(new TaskMetrics(
+        executorDeserializeTime = 5L,
+        executorDeserializeCpuTime = 3L,
+        executorRunTime = 90L,
+        executorCpuTime = 10L,
+        resultSize = 100L,
+        jvmGcTime = 10L,
+        resultSerializationTime = 2L,
+        memoryBytesSpilled = 0L,
+        diskBytesSpilled = 0L,
+        peakExecutionMemory = 100L,
+        inputMetrics = null,
+        outputMetrics = null,
+        shuffleReadMetrics = null,
+        shuffleWriteMetrics = null)))
+    assert(AppStatusUtils.schedulerDelay(finishedTask) === 3L)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to