Repository: spark
Updated Branches:
  refs/heads/master 020ff8cd3 -> 27fe6bacc


[SPARK-13604][CORE] Sync worker's state after registering with master

## What changes were proposed in this pull request?

Here lists all cases that Master cannot talk with Worker for a while and then 
network is back.

1. Master doesn't know the network issue (not yet timeout)

  a. Worker doesn't know the network issue (onDisconnected is not called)
    - Worker keeps sending Heartbeat. Both Worker and Master don't know the 
network issue. Nothing to do. (Finally, Master will notice the heartbeat 
timeout if network is not recovered)

  b. Worker knows the network issue (onDisconnected is called)
    - Worker stops sending Heartbeat and sends `RegisterWorker` to master. 
Master will reply `RegisterWorkerFailed("Duplicate worker ID")`. Worker calls 
"System.exit(1)" (Finally, Master will notice the heartbeat timeout if network 
is not recovered) (May leak driver processes. See 
[SPARK-13602](https://issues.apache.org/jira/browse/SPARK-13602))

2. Worker timeout (Master knows the network issue). In such case,  master 
removes Worker and its executors and drivers.

  a. Worker doesn't know the network issue (onDisconnected is not called)
    - Worker keeps sending Heartbeat.
    - If the network is back, say Master receives Heartbeat, Master sends 
`ReconnectWorker` to Worker
    - Worker send `RegisterWorker` to master.
    - Master accepts `RegisterWorker` but doesn't know executors and drivers in 
Worker. (may leak executors)

  b. Worker knows the network issue (onDisconnected is called)
    - Worker stop sending `Heartbeat`. Worker will send "RegisterWorker" to 
master.
    - Master accepts `RegisterWorker` but doesn't know executors and drivers in 
Worker. (may leak executors)

This PR fixes executors and drivers leak in 2.a and 2.b when Worker reregisters 
with Master. The approach is making Worker send `WorkerLatestState` to sync the 
state after registering with master successfully. Then Master will ask Worker 
to kill unknown executors and drivers.

Note:  Worker cannot just kill executors after registering with master because 
in the worker, `LaunchExecutor` and `RegisteredWorker` are processed in two 
threads. If `LaunchExecutor` happens before `RegisteredWorker`, Worker's 
executor list will contain new executors after Master accepts `RegisterWorker`. 
We should not kill these executors. So sending the list to Master and let 
Master tell Worker which executors should be killed.

## How was this patch tested?

test("SPARK-13604: Master should ask Worker kill unknown executors and drivers")

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #11455 from zsxwing/orphan-executors.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27fe6bac
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27fe6bac
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27fe6bac

Branch: refs/heads/master
Commit: 27fe6bacc532184ef6e8a2a24cd07f2c9188004e
Parents: 020ff8c
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Thu Mar 10 16:59:14 2016 -0800
Committer: Andrew Or <and...@databricks.com>
Committed: Thu Mar 10 16:59:14 2016 -0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/DeployMessage.scala | 10 ++++
 .../org/apache/spark/deploy/master/Master.scala | 24 +++++++++
 .../org/apache/spark/deploy/worker/Worker.scala |  5 ++
 .../spark/deploy/master/MasterSuite.scala       | 56 ++++++++++++++++++--
 4 files changed, 90 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27fe6bac/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala 
b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 3e78c7a..34c0696 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -64,6 +64,16 @@ private[deploy] object DeployMessages {
   case class WorkerSchedulerStateResponse(id: String, executors: 
List[ExecutorDescription],
      driverIds: Seq[String])
 
+  /**
+   * A worker will send this message to the master when it registers with the 
master. Then the
+   * master will compare them with the executors and drivers in the master and 
tell the worker to
+   * kill the unknown executors and drivers.
+   */
+  case class WorkerLatestState(
+      id: String,
+      executors: Seq[ExecutorDescription],
+      driverIds: Seq[String]) extends DeployMessage
+
   case class Heartbeat(workerId: String, worker: RpcEndpointRef) extends 
DeployMessage
 
   // Master to Worker

http://git-wip-us.apache.org/repos/asf/spark/blob/27fe6bac/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index ff8d29f..6b9b140 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -368,6 +368,30 @@ private[deploy] class Master(
       if (canCompleteRecovery) { completeRecovery() }
     }
 
+    case WorkerLatestState(workerId, executors, driverIds) =>
+      idToWorker.get(workerId) match {
+        case Some(worker) =>
+          for (exec <- executors) {
+            val executorMatches = worker.executors.exists {
+              case (_, e) => e.application.id == exec.appId && e.id == 
exec.execId
+            }
+            if (!executorMatches) {
+              // master doesn't recognize this executor. So just tell worker 
to kill it.
+              worker.endpoint.send(KillExecutor(masterUrl, exec.appId, 
exec.execId))
+            }
+          }
+
+          for (driverId <- driverIds) {
+            val driverMatches = worker.drivers.exists { case (id, _) => id == 
driverId }
+            if (!driverMatches) {
+              // master doesn't recognize this driver. So just tell worker to 
kill it.
+              worker.endpoint.send(KillDriver(driverId))
+            }
+          }
+        case None =>
+          logWarning("Worker state from unknown worker: " + workerId)
+      }
+
     case UnregisterApplication(applicationId) =>
       logInfo(s"Received unregister request from application $applicationId")
       idToApp.get(applicationId).foreach(finishApplication)

http://git-wip-us.apache.org/repos/asf/spark/blob/27fe6bac/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala 
b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 283db6c..c18c8c7 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -374,6 +374,11 @@ private[deploy] class Worker(
           }, CLEANUP_INTERVAL_MILLIS, CLEANUP_INTERVAL_MILLIS, 
TimeUnit.MILLISECONDS)
         }
 
+        val execs = executors.values.map { e =>
+          new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
+        }
+        masterRef.send(WorkerLatestState(workerId, execs.toList, 
drivers.keys.toSeq))
+
       case RegisterWorkerFailed(message) =>
         if (!registered) {
           logError("Worker registration failed: " + message)

http://git-wip-us.apache.org/repos/asf/spark/blob/27fe6bac/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index ce00807..7cbe4e3 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -18,22 +18,36 @@
 package org.apache.spark.deploy.master
 
 import java.util.Date
+import java.util.concurrent.ConcurrentLinkedQueue
 
+import scala.collection.JavaConverters._
 import scala.concurrent.duration._
 import scala.io.Source
 import scala.language.postfixOps
 
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
-import org.scalatest.{Matchers, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually
 import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
 
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy._
-import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.deploy.DeployMessages._
+import org.apache.spark.rpc.{RpcEndpoint, RpcEnv}
 
-class MasterSuite extends SparkFunSuite with Matchers with Eventually with 
PrivateMethodTester {
+class MasterSuite extends SparkFunSuite
+  with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
+
+  private var _master: Master = _
+
+  after {
+    if (_master != null) {
+      _master.rpcEnv.shutdown()
+      _master.rpcEnv.awaitTermination()
+      _master = null
+    }
+  }
 
   test("can use a custom recovery mode factory") {
     val conf = new SparkConf(loadDefaults = false)
@@ -357,10 +371,11 @@ class MasterSuite extends SparkFunSuite with Matchers 
with Eventually with Priva
   private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
 
   private def makeMaster(conf: SparkConf = new SparkConf): Master = {
+    assert(_master === null, "Some Master's RpcEnv is leaked in tests")
     val securityMgr = new SecurityManager(conf)
     val rpcEnv = RpcEnv.create(Master.SYSTEM_NAME, "localhost", 0, conf, 
securityMgr)
-    val master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
-    master
+    _master = new Master(rpcEnv, rpcEnv.address, 0, securityMgr, conf)
+    _master
   }
 
   private def makeAppInfo(
@@ -386,4 +401,35 @@ class MasterSuite extends SparkFunSuite with Matchers with 
Eventually with Priva
     master.invokePrivate(_scheduleExecutorsOnWorkers(appInfo, workerInfos, 
spreadOut))
   }
 
+  test("SPARK-13604: Master should ask Worker kill unknown executors and 
drivers") {
+    val master = makeMaster()
+    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    eventually(timeout(10.seconds)) {
+      val masterState = 
master.self.askWithRetry[MasterStateResponse](RequestMasterState)
+      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+    }
+
+    val killedExecutors = new ConcurrentLinkedQueue[(String, Int)]()
+    val killedDrivers = new ConcurrentLinkedQueue[String]()
+    val fakeWorker = master.rpcEnv.setupEndpoint("worker", new RpcEndpoint {
+      override val rpcEnv: RpcEnv = master.rpcEnv
+
+      override def receive: PartialFunction[Any, Unit] = {
+        case KillExecutor(_, appId, execId) => killedExecutors.add(appId, 
execId)
+        case KillDriver(driverId) => killedDrivers.add(driverId)
+      }
+    })
+
+    master.self.ask(
+      RegisterWorker("1", "localhost", 9999, fakeWorker, 10, 1024, 
"http://localhost:8080";))
+    val executors = (0 until 3).map { i =>
+      new ExecutorDescription(appId = i.toString, execId = i, 2, 
ExecutorState.RUNNING)
+    }
+    master.self.send(WorkerLatestState("1", executors, driverIds = Seq("0", 
"1", "2")))
+
+    eventually(timeout(10.seconds)) {
+      assert(killedExecutors.asScala.toList.sorted === List("0" -> 0, "1" -> 
1, "2" -> 2))
+      assert(killedDrivers.asScala.toList.sorted === List("0", "1", "2"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to