Repository: spark Updated Branches: refs/heads/branch-1.5 5ffd0841e -> 118ebd405
Revert "[SPARK-10640] History server fails to parse TaskCommitDenied" This reverts commit 5ffd0841e016301807b0a008af7c3346e9f59e7a. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/118ebd40 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/118ebd40 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/118ebd40 Branch: refs/heads/branch-1.5 Commit: 118ebd405a34acedb32e9f3d1cf7b5a835e17dbb Parents: 5ffd084 Author: Andrew Or <and...@databricks.com> Authored: Tue Sep 22 17:10:58 2015 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Tue Sep 22 17:10:58 2015 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/TaskEndReason.scala | 6 +----- .../org/apache/spark/util/JsonProtocol.scala | 13 ------------ .../apache/spark/util/JsonProtocolSuite.scala | 22 -------------------- 3 files changed, 1 insertion(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/main/scala/org/apache/spark/TaskEndReason.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TaskEndReason.scala b/core/src/main/scala/org/apache/spark/TaskEndReason.scala index b50354c..7a690df 100644 --- a/core/src/main/scala/org/apache/spark/TaskEndReason.scala +++ b/core/src/main/scala/org/apache/spark/TaskEndReason.scala @@ -17,17 +17,13 @@ package org.apache.spark -import java.io.{ObjectInputStream, ObjectOutputStream} +import java.io.{IOException, ObjectInputStream, ObjectOutputStream} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.executor.TaskMetrics import org.apache.spark.storage.BlockManagerId import org.apache.spark.util.Utils -// ============================================================================================== -// NOTE: new task end reasons MUST be accompanied with serialization logic in util.JsonProtocol! -// ============================================================================================== - /** * :: DeveloperApi :: * Various possible reasons why a task ended. The low-level TaskScheduler is supposed to retry http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index 311bb59..f742c39 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -364,10 +364,6 @@ private[spark] object JsonProtocol { ("Metrics" -> metrics) case ExecutorLostFailure(executorId) => ("Executor ID" -> executorId) - case taskCommitDenied: TaskCommitDenied => - ("Job ID" -> taskCommitDenied.jobID) ~ - ("Partition ID" -> taskCommitDenied.partitionID) ~ - ("Attempt Number" -> taskCommitDenied.attemptNumber) case _ => Utils.emptyJson } ("Reason" -> reason) ~ json @@ -773,7 +769,6 @@ private[spark] object JsonProtocol { val exceptionFailure = Utils.getFormattedClassName(ExceptionFailure) val taskResultLost = Utils.getFormattedClassName(TaskResultLost) val taskKilled = Utils.getFormattedClassName(TaskKilled) - val taskCommitDenied = Utils.getFormattedClassName(TaskCommitDenied) val executorLostFailure = Utils.getFormattedClassName(ExecutorLostFailure) val unknownReason = Utils.getFormattedClassName(UnknownReason) @@ -798,14 +793,6 @@ private[spark] object JsonProtocol { ExceptionFailure(className, description, stackTrace, fullStackTrace, metrics, None) case `taskResultLost` => TaskResultLost case `taskKilled` => TaskKilled - case `taskCommitDenied` => - // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON - // de/serialization logic was not added until 1.5.1. To provide backward compatibility - // for reading those logs, we need to provide default values for all the fields. - val jobId = Utils.jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1) - val partitionId = Utils.jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1) - val attemptNo = Utils.jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1) - TaskCommitDenied(jobId, partitionId, attemptNo) case `executorLostFailure` => val executorId = Utils.jsonOption(json \ "Executor ID").map(_.extract[String]) ExecutorLostFailure(executorId.getOrElse("Unknown")) http://git-wip-us.apache.org/repos/asf/spark/blob/118ebd40/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 373c36b..4bf6660 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -152,7 +152,6 @@ class JsonProtocolSuite extends SparkFunSuite { testTaskEndReason(TaskResultLost) testTaskEndReason(TaskKilled) testTaskEndReason(ExecutorLostFailure("100")) - testTaskEndReason(TaskCommitDenied(2, 3, 4)) testTaskEndReason(UnknownReason) // BlockId @@ -353,17 +352,6 @@ class JsonProtocolSuite extends SparkFunSuite { assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo)) } - // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1 - test("TaskCommitDenied backward compatibility") { - val denied = TaskCommitDenied(1, 2, 3) - val oldDenied = JsonProtocol.taskEndReasonToJson(denied) - .removeField({ _._1 == "Job ID" }) - .removeField({ _._1 == "Partition ID" }) - .removeField({ _._1 == "Attempt Number" }) - val expectedDenied = TaskCommitDenied(-1, -1, -1) - assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied)) - } - /** -------------------------- * | Helper test running methods | * --------------------------- */ @@ -589,17 +577,7 @@ class JsonProtocolSuite extends SparkFunSuite { assertOptionEquals(r1.metrics, r2.metrics, assertTaskMetricsEquals) case (TaskResultLost, TaskResultLost) => case (TaskKilled, TaskKilled) => -<<<<<<< HEAD case (ExecutorLostFailure(execId1), ExecutorLostFailure(execId2)) => -======= - case (TaskCommitDenied(jobId1, partitionId1, attemptNumber1), - TaskCommitDenied(jobId2, partitionId2, attemptNumber2)) => - assert(jobId1 === jobId2) - assert(partitionId1 === partitionId2) - assert(attemptNumber1 === attemptNumber2) - case (ExecutorLostFailure(execId1, isNormalExit1), - ExecutorLostFailure(execId2, isNormalExit2)) => ->>>>>>> 61d4c07... [SPARK-10640] History server fails to parse TaskCommitDenied assert(execId1 === execId2) case (UnknownReason, UnknownReason) => case _ => fail("Task end reasons don't match in types!") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org