Repository: spark
Updated Branches:
  refs/heads/branch-2.2 220943d85 -> 53212c32b


[SPARK-12552][CORE] Correctly count the driver resource when recovering from 
failure for Master

Currently in Standalone HA mode, the resource usage of driver is not correctly 
counted in Master when recovering from failure, this will lead to some 
unexpected behaviors like negative value in UI.

So here fix this to also count the driver's resource usage.

Also changing the recovered app's state to `RUNNING` when fully recovered. 
Previously it will always be WAITING even fully recovered.

andrewor14 please help to review, thanks a lot.

Author: jerryshao <ss...@hortonworks.com>

Closes #10506 from jerryshao/SPARK-12552.

(cherry picked from commit 9eb095243b0bd8a2627db6dc36daa4561ef3c08b)
Signed-off-by: Wenchen Fan <wenc...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/53212c32
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/53212c32
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/53212c32

Branch: refs/heads/branch-2.2
Commit: 53212c32bc39dd86a2bb0f9c5a1f747591e545a0
Parents: 220943d
Author: jerryshao <ss...@hortonworks.com>
Authored: Wed Jun 14 08:12:15 2017 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Wed Jun 14 08:12:53 2017 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/master/Master.scala |   5 +-
 .../spark/deploy/master/MasterSuite.scala       | 130 ++++++++++++++++++-
 2 files changed, 131 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/53212c32/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 96b53c6..cbc5aae 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -367,7 +367,7 @@ private[deploy] class Master(
             drivers.find(_.id == driverId).foreach { driver =>
               driver.worker = Some(worker)
               driver.state = DriverState.RUNNING
-              worker.drivers(driverId) = driver
+              worker.addDriver(driver)
             }
           }
         case None =>
@@ -547,6 +547,9 @@ private[deploy] class Master(
     workers.filter(_.state == WorkerState.UNKNOWN).foreach(removeWorker)
     apps.filter(_.state == ApplicationState.UNKNOWN).foreach(finishApplication)
 
+    // Update the state of recovered apps to RUNNING
+    apps.filter(_.state == ApplicationState.WAITING).foreach(_.state = 
ApplicationState.RUNNING)
+
     // Reschedule drivers which were not claimed by any workers
     drivers.filter(_.worker.isEmpty).foreach { d =>
       logWarning(s"Driver ${d.id} was not found after master recovery")

http://git-wip-us.apache.org/repos/asf/spark/blob/53212c32/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 5392646..4f432e4 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -21,12 +21,15 @@ import java.util.Date
 import java.util.concurrent.ConcurrentLinkedQueue
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable.{HashMap, HashSet}
 import scala.concurrent.duration._
 import scala.io.Source
 import scala.language.postfixOps
+import scala.reflect.ClassTag
 
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
+import org.mockito.Mockito.{mock, when}
 import org.scalatest.{BeforeAndAfter, Matchers, PrivateMethodTester}
 import org.scalatest.concurrent.Eventually
 import other.supplier.{CustomPersistenceEngine, CustomRecoveryModeFactory}
@@ -34,7 +37,8 @@ import other.supplier.{CustomPersistenceEngine, 
CustomRecoveryModeFactory}
 import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
 import org.apache.spark.deploy._
 import org.apache.spark.deploy.DeployMessages._
-import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEnv}
+import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
+import org.apache.spark.serializer
 
 class MasterSuite extends SparkFunSuite
   with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
@@ -134,6 +138,81 @@ class MasterSuite extends SparkFunSuite
     CustomRecoveryModeFactory.instantiationAttempts should be > 
instantiationAttempts
   }
 
+  test("master correctly recover the application") {
+    val conf = new SparkConf(loadDefaults = false)
+    conf.set("spark.deploy.recoveryMode", "CUSTOM")
+    conf.set("spark.deploy.recoveryMode.factory",
+      classOf[FakeRecoveryModeFactory].getCanonicalName)
+    conf.set("spark.master.rest.enabled", "false")
+
+    val fakeAppInfo = makeAppInfo(1024)
+    val fakeWorkerInfo = makeWorkerInfo(8192, 16)
+    val fakeDriverInfo = new DriverInfo(
+      startTime = 0,
+      id = "test_driver",
+      desc = new DriverDescription(
+        jarUrl = "",
+        mem = 1024,
+        cores = 1,
+        supervise = false,
+        command = new Command("", Nil, Map.empty, Nil, Nil, Nil)),
+      submitDate = new Date())
+
+    // Build the fake recovery data
+    FakeRecoveryModeFactory.persistentData.put(s"app_${fakeAppInfo.id}", 
fakeAppInfo)
+    FakeRecoveryModeFactory.persistentData.put(s"driver_${fakeDriverInfo.id}", 
fakeDriverInfo)
+    FakeRecoveryModeFactory.persistentData.put(s"worker_${fakeWorkerInfo.id}", 
fakeWorkerInfo)
+
+    var master: Master = null
+    try {
+      master = makeMaster(conf)
+      master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+      // Wait until Master recover from checkpoint data.
+      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+        master.idToApp.size should be(1)
+      }
+
+      master.idToApp.keySet should be(Set(fakeAppInfo.id))
+      getDrivers(master) should be(Set(fakeDriverInfo))
+      master.workers should be(Set(fakeWorkerInfo))
+
+      // Notify Master about the executor and driver info to make it correctly 
recovered.
+      val fakeExecutors = List(
+        new ExecutorDescription(fakeAppInfo.id, 0, 8, ExecutorState.RUNNING),
+        new ExecutorDescription(fakeAppInfo.id, 0, 7, ExecutorState.RUNNING))
+
+      fakeAppInfo.state should be(ApplicationState.UNKNOWN)
+      fakeWorkerInfo.coresFree should be(16)
+      fakeWorkerInfo.coresUsed should be(0)
+
+      master.self.send(MasterChangeAcknowledged(fakeAppInfo.id))
+      eventually(timeout(1 second), interval(10 milliseconds)) {
+        // Application state should be WAITING when "MasterChangeAcknowledged" 
event executed.
+        fakeAppInfo.state should be(ApplicationState.WAITING)
+      }
+
+      master.self.send(
+        WorkerSchedulerStateResponse(fakeWorkerInfo.id, fakeExecutors, 
Seq(fakeDriverInfo.id)))
+
+      eventually(timeout(5 seconds), interval(100 milliseconds)) {
+        getState(master) should be(RecoveryState.ALIVE)
+      }
+
+      // If driver's resource is also counted, free cores should 0
+      fakeWorkerInfo.coresFree should be(0)
+      fakeWorkerInfo.coresUsed should be(16)
+      // State of application should be RUNNING
+      fakeAppInfo.state should be(ApplicationState.RUNNING)
+    } finally {
+      if (master != null) {
+        master.rpcEnv.shutdown()
+        master.rpcEnv.awaitTermination()
+        master = null
+        FakeRecoveryModeFactory.persistentData.clear()
+      }
+    }
+  }
+
   test("master/worker web ui available") {
     implicit val formats = org.json4s.DefaultFormats
     val conf = new SparkConf()
@@ -394,6 +473,9 @@ class MasterSuite extends SparkFunSuite
   // ==========================================
 
   private val _scheduleExecutorsOnWorkers = 
PrivateMethod[Array[Int]]('scheduleExecutorsOnWorkers)
+  private val _drivers = PrivateMethod[HashSet[DriverInfo]]('drivers)
+  private val _state = PrivateMethod[RecoveryState.Value]('state)
+
   private val workerInfo = makeWorkerInfo(4096, 10)
   private val workerInfos = Array(workerInfo, workerInfo, workerInfo)
 
@@ -412,12 +494,18 @@ class MasterSuite extends SparkFunSuite
     val desc = new ApplicationDescription(
       "test", maxCores, memoryPerExecutorMb, null, "", None, None, 
coresPerExecutor)
     val appId = System.currentTimeMillis.toString
-    new ApplicationInfo(0, appId, desc, new Date, null, Int.MaxValue)
+    val endpointRef = mock(classOf[RpcEndpointRef])
+    val mockAddress = mock(classOf[RpcAddress])
+    when(endpointRef.address).thenReturn(mockAddress)
+    new ApplicationInfo(0, appId, desc, new Date, endpointRef, Int.MaxValue)
   }
 
   private def makeWorkerInfo(memoryMb: Int, cores: Int): WorkerInfo = {
     val workerId = System.currentTimeMillis.toString
-    new WorkerInfo(workerId, "host", 100, cores, memoryMb, null, 
"http://localhost:80";)
+    val endpointRef = mock(classOf[RpcEndpointRef])
+    val mockAddress = mock(classOf[RpcAddress])
+    when(endpointRef.address).thenReturn(mockAddress)
+    new WorkerInfo(workerId, "host", 100, cores, memoryMb, endpointRef, 
"http://localhost:80";)
   }
 
   private def scheduleExecutorsOnWorkers(
@@ -499,4 +587,40 @@ class MasterSuite extends SparkFunSuite
       assert(receivedMasterAddress === RpcAddress("localhost2", 10000))
     }
   }
+
+  private def getDrivers(master: Master): HashSet[DriverInfo] = {
+    master.invokePrivate(_drivers())
+  }
+
+  private def getState(master: Master): RecoveryState.Value = {
+    master.invokePrivate(_state())
+  }
+}
+
+private class FakeRecoveryModeFactory(conf: SparkConf, ser: 
serializer.Serializer)
+    extends StandaloneRecoveryModeFactory(conf, ser) {
+  import FakeRecoveryModeFactory.persistentData
+
+  override def createPersistenceEngine(): PersistenceEngine = new 
PersistenceEngine {
+
+    override def unpersist(name: String): Unit = {
+      persistentData.remove(name)
+    }
+
+    override def persist(name: String, obj: Object): Unit = {
+      persistentData(name) = obj
+    }
+
+    override def read[T: ClassTag](prefix: String): Seq[T] = {
+      
persistentData.filter(_._1.startsWith(prefix)).map(_._2.asInstanceOf[T]).toSeq
+    }
+  }
+
+  override def createLeaderElectionAgent(master: LeaderElectable): 
LeaderElectionAgent = {
+    new MonarchyLeaderAgent(master)
+  }
+}
+
+private object FakeRecoveryModeFactory {
+  val persistentData = new HashMap[String, Object]()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to