Repository: spark
Updated Branches:
  refs/heads/master 9f3e59a16 -> 88875d941


[SPARK-10558][CORE] Fix wrong executor state in Master

`ExecutorAdded` can only be sent to `AppClient` when worker report back the 
executor state as `LOADING`, otherwise because of concurrency issue, 
`AppClient` will possibly receive `ExectuorAdded` at first, then 
`ExecutorStateUpdated` with `LOADING` state.

Also Master will change the executor state from `LAUNCHING` to `RUNNING` 
(`AppClient` report back the state as `RUNNING`), then to `LOADING` (worker 
report back to state as `LOADING`), it should be `LAUNCHING` -> `LOADING` -> 
`RUNNING`.

Also it is wrongly shown in master UI, the state of executor should be 
`RUNNING` rather than `LOADING`:

![screen shot 2015-09-11 at 2 30 28 
pm](https://cloud.githubusercontent.com/assets/850797/9809254/3155d840-5899-11e5-8cdf-ad06fef75762.png)

Author: jerryshao <ss...@hortonworks.com>

Closes #8714 from jerryshao/SPARK-10558.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/88875d94
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/88875d94
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/88875d94

Branch: refs/heads/master
Commit: 88875d9413ec7d64a88d40857ffcf97b5853a7f2
Parents: 9f3e59a
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Nov 25 11:42:53 2015 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Nov 25 11:42:53 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/ExecutorState.scala |  2 +-
 .../org/apache/spark/deploy/client/AppClient.scala    |  3 ---
 .../scala/org/apache/spark/deploy/master/Master.scala | 14 +++++++++++---
 .../scala/org/apache/spark/deploy/worker/Worker.scala |  2 +-
 4 files changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala 
b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
index efa88c6..69c98e2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ExecutorState.scala
@@ -19,7 +19,7 @@ package org.apache.spark.deploy
 
 private[deploy] object ExecutorState extends Enumeration {
 
-  val LAUNCHING, LOADING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
+  val LAUNCHING, RUNNING, KILLED, FAILED, LOST, EXITED = Value
 
   type ExecutorState = Value
 

http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
index afab362..df6ba7d 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/AppClient.scala
@@ -178,9 +178,6 @@ private[spark] class AppClient(
         val fullId = appId + "/" + id
         logInfo("Executor added: %s on %s (%s) with %d cores".format(fullId, 
workerId, hostPort,
           cores))
-        // FIXME if changing master and `ExecutorAdded` happen at the same 
time (the order is not
-        // guaranteed), `ExecutorStateChanged` may be sent to a dead master.
-        sendToMaster(ExecutorStateChanged(appId.get, id, 
ExecutorState.RUNNING, None, None))
         listener.executorAdded(fullId, workerId, hostPort, cores, memory)
 
       case ExecutorUpdated(id, state, message, exitStatus) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b25a487..9952c97 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -253,9 +253,17 @@ private[deploy] class Master(
       execOption match {
         case Some(exec) => {
           val appInfo = idToApp(appId)
+          val oldState = exec.state
           exec.state = state
-          if (state == ExecutorState.RUNNING) { appInfo.resetRetryCount() }
+
+          if (state == ExecutorState.RUNNING) {
+            assert(oldState == ExecutorState.LAUNCHING,
+              s"executor $execId state transfer from $oldState to RUNNING is 
illegal")
+            appInfo.resetRetryCount()
+          }
+
           exec.application.driver.send(ExecutorUpdated(execId, state, message, 
exitStatus))
+
           if (ExecutorState.isFinished(state)) {
             // Remove this executor from the worker and app
             logInfo(s"Removing executor ${exec.fullId} because it is $state")
@@ -702,8 +710,8 @@ private[deploy] class Master(
     worker.addExecutor(exec)
     worker.endpoint.send(LaunchExecutor(masterUrl,
       exec.application.id, exec.id, exec.application.desc, exec.cores, 
exec.memory))
-    exec.application.driver.send(ExecutorAdded(
-      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
+    exec.application.driver.send(
+      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, 
exec.memory))
   }
 
   private def registerWorker(worker: WorkerInfo): Boolean = {

http://git-wip-us.apache.org/repos/asf/spark/blob/88875d94/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index a45867e..418faf8 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -469,7 +469,7 @@ private[deploy] class Worker(
             executorDir,
             workerUri,
             conf,
-            appLocalDirs, ExecutorState.LOADING)
+            appLocalDirs, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to