Repository: spark
Updated Branches:
  refs/heads/branch-1.5 118ebd405 -> 26187ab74


[SPARK-10640] History server fails to parse TaskCommitDenied

... simply because the code is missing!

Author: Andrew Or <and...@databricks.com>

Closes #8828 from andrewor14/task-end-reason-json.

Conflicts:
        core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
        core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/26187ab7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/26187ab7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/26187ab7

Branch: refs/heads/branch-1.5
Commit: 26187ab744687f57f295700141b5c0844949faad
Parents: 118ebd4
Author: Andrew Or <and...@databricks.com>
Authored: Tue Sep 22 16:35:43 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Tue Sep 22 17:35:05 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/TaskEndReason.scala     |  6 +++++-
 .../scala/org/apache/spark/util/JsonProtocol.scala | 13 +++++++++++++
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 17 +++++++++++++++++
 3 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/26187ab7/core/src/main/scala/org/apache/spark/TaskEndReason.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala 
b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
index 7a690df..b50354c 100644
--- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala
+++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala
@@ -17,13 +17,17 @@
 
 package org.apache.spark
 
-import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
+import java.io.{ObjectInputStream, ObjectOutputStream}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.storage.BlockManagerId
 import org.apache.spark.util.Utils
 
+// 
==============================================================================================
+// NOTE: new task end reasons MUST be accompanied with serialization logic in 
util.JsonProtocol!
+// 
==============================================================================================
+
 /**
  * :: DeveloperApi ::
  * Various possible reasons why a task ended. The low-level TaskScheduler is 
supposed to retry

http://git-wip-us.apache.org/repos/asf/spark/blob/26187ab7/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala 
b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f742c39..311bb59 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -364,6 +364,10 @@ private[spark] object JsonProtocol {
         ("Metrics" -> metrics)
       case ExecutorLostFailure(executorId) =>
         ("Executor ID" -> executorId)
+      case taskCommitDenied: TaskCommitDenied =>
+        ("Job ID" -> taskCommitDenied.jobID) ~
+        ("Partition ID" -> taskCommitDenied.partitionID) ~
+        ("Attempt Number" -> taskCommitDenied.attemptNumber)
       case _ => Utils.emptyJson
     }
     ("Reason" -> reason) ~ json
@@ -769,6 +773,7 @@ private[spark] object JsonProtocol {
     val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure)
     val taskResultLost = Utils.getFormattedClassName(TaskResultLost)
     val taskKilled = Utils.getFormattedClassName(TaskKilled)
+    val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied)
     val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure)
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
 
@@ -793,6 +798,14 @@ private[spark] object JsonProtocol {
         ExceptionFailure(className, description, stackTrace, fullStackTrace, 
metrics, None)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` => TaskKilled
+      case `taskCommitDenied` =>
+        // Unfortunately, the `TaskCommitDenied` message was introduced in 
1.3.0 but the JSON
+        // de/serialization logic was not added until 1.5.1. To provide 
backward compatibility
+        // for reading those logs, we need to provide default values for all 
the fields.
+        val jobId = Utils.jsonOption(json \ "Job 
ID").map(_.extract[Int]).getOrElse(-1)
+        val partitionId = Utils.jsonOption(json \ "Partition 
ID").map(_.extract[Int]).getOrElse(-1)
+        val attemptNo = Utils.jsonOption(json \ "Attempt 
Number").map(_.extract[Int]).getOrElse(-1)
+        TaskCommitDenied(jobId, partitionId, attemptNo)
       case `executorLostFailure` =>
         val executorId = Utils.jsonOption(json \ "Executor 
ID").map(_.extract[String])
         ExecutorLostFailure(executorId.getOrElse("Unknown"))

http://git-wip-us.apache.org/repos/asf/spark/blob/26187ab7/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 4bf6660..25336bb 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -152,6 +152,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testTaskEndReason(TaskResultLost)
     testTaskEndReason(TaskKilled)
     testTaskEndReason(ExecutorLostFailure("100"))
+    testTaskEndReason(TaskCommitDenied(2, 3, 4))
     testTaskEndReason(UnknownReason)
 
     // BlockId
@@ -352,6 +353,17 @@ class JsonProtocolSuite extends SparkFunSuite {
     assertEquals(expectedStageInfo, 
JsonProtocol.stageInfoFromJson(oldStageInfo))
   }
 
+  // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was 
added in 1.5.1
+  test("TaskCommitDenied backward compatibility") {
+    val denied = TaskCommitDenied(1, 2, 3)
+    val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
+      .removeField({ _._1 == "Job ID" })
+      .removeField({ _._1 == "Partition ID" })
+      .removeField({ _._1 == "Attempt Number" })
+    val expectedDenied = TaskCommitDenied(-1, -1, -1)
+    assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
+  }
+
   /** -------------------------- *
    | Helper test running methods |
    * --------------------------- */
@@ -579,6 +591,11 @@ class JsonProtocolSuite extends SparkFunSuite {
       case (TaskKilled, TaskKilled) =>
       case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) =>
         assert(execId1 === execId2)
+      case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1),
+          TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) =>
+        assert(jobId1 === jobId2)
+        assert(partitionId1 === partitionId2)
+        assert(attemptNumber1 === attemptNumber2)
       case (UnknownReason, UnknownReason) =>
       case _ => fail("Task end reasons don't match in types!")
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to