Repository: spark
Updated Branches:
  refs/heads/master 277b1924b -> b2463fad7


[SPARK-22145][MESOS] fix supervise with checkpointing on mesos

## What changes were proposed in this pull request?

- Fixes the issue with the frameworkId being recovered by checkpointed data 
overwriting the one sent by the dipatcher.
- Keeps submission driver id as the only index for all data structures in the 
dispatcher.
Allocates a different task id per driver retry to satisfy the mesos 
requirements. Check the relevant ticket for the details on that.
## How was this patch tested?

Manually tested this with DC/OS 1.10. Launched a streaming job with 
checkpointing to hdfs, made the driver fail several times and observed behavior:
![image](https://user-images.githubusercontent.com/7945591/30940500-f7d2a744-a3e9-11e7-8c56-f2ccbb271e80.png)

![image](https://user-images.githubusercontent.com/7945591/30940550-19bc15de-a3ea-11e7-8a11-f48abfe36720.png)

![image](https://user-images.githubusercontent.com/7945591/30940524-083ea308-a3ea-11e7-83ae-00d3fa17b928.png)

![image](https://user-images.githubusercontent.com/7945591/30940579-2f0fb242-a3ea-11e7-82f9-86179da28b8c.png)

![image](https://user-images.githubusercontent.com/7945591/30940591-3b561b0e-a3ea-11e7-9dbd-e71912bb2ef3.png)

![image](https://user-images.githubusercontent.com/7945591/30940605-49c810ca-a3ea-11e7-8af5-67930851fd38.png)

![image](https://user-images.githubusercontent.com/7945591/30940631-59f4a288-a3ea-11e7-88cb-c3741b72bb13.png)

![image](https://user-images.githubusercontent.com/7945591/30940642-62346c9e-a3ea-11e7-8935-82e494925f67.png)

![image](https://user-images.githubusercontent.com/7945591/30940653-6c46d53c-a3ea-11e7-8dd1-5840d484d28c.png)

Author: Stavros Kontopoulos <st.kontopou...@gmail.com>

Closes #19374 from skonto/fix_retry.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b2463fad
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b2463fad
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b2463fad

Branch: refs/heads/master
Commit: b2463fad718d25f564d62c50d587610de3d0c5bd
Parents: 277b192
Author: Stavros Kontopoulos <st.kontopou...@gmail.com>
Authored: Thu Nov 2 13:25:48 2017 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Nov 2 13:25:48 2017 +0000

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   |  1 +
 .../cluster/mesos/MesosClusterScheduler.scala   | 90 ++++++++++++--------
 .../org/apache/spark/streaming/Checkpoint.scala |  3 +-
 3 files changed, 57 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b2463fad/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 6f25d34..c7dd635 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -310,6 +310,7 @@ class SparkContext(config: SparkConf) extends Logging {
    * (i.e.
    *  in case of local spark app something like 'local-1433865536131'
    *  in case of YARN something like 'application_1433865536131_34483'
+   *  in case of MESOS something like 'driver-20170926223339-0001'
    * )
    */
   def applicationId: String = _applicationId

http://git-wip-us.apache.org/repos/asf/spark/blob/b2463fad/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 8247026..de846c8 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -134,22 +134,24 @@ private[spark] class MesosClusterScheduler(
   private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
+  // Keyed by submission id
   private val finishedDrivers =
     new mutable.ArrayBuffer[MesosClusterSubmissionState](retainedDrivers)
   private var frameworkId: String = null
-  // Holds all the launched drivers and current launch state, keyed by driver 
id.
+  // Holds all the launched drivers and current launch state, keyed by 
submission id.
   private val launchedDrivers = new mutable.HashMap[String, 
MesosClusterSubmissionState]()
   // Holds a map of driver id to expected slave id that is passed to Mesos for 
reconciliation.
   // All drivers that are loaded after failover are added here, as we need get 
the latest
-  // state of the tasks from Mesos.
+  // state of the tasks from Mesos. Keyed by task Id.
   private val pendingRecover = new mutable.HashMap[String, SlaveID]()
-  // Stores all the submitted drivers that hasn't been launched.
+  // Stores all the submitted drivers that hasn't been launched, keyed by 
submission id
   private val queuedDrivers = new ArrayBuffer[MesosDriverDescription]()
-  // All supervised drivers that are waiting to retry after termination.
+  // All supervised drivers that are waiting to retry after termination, keyed 
by submission id
   private val pendingRetryDrivers = new ArrayBuffer[MesosDriverDescription]()
   private val queuedDriversState = engineFactory.createEngine("driverQueue")
   private val launchedDriversState = 
engineFactory.createEngine("launchedDrivers")
   private val pendingRetryDriversState = 
engineFactory.createEngine("retryList")
+  private final val RETRY_SEP = "-retry-"
   // Flag to mark if the scheduler is ready to be called, which is until the 
scheduler
   // is registered with Mesos master.
   @volatile protected var ready = false
@@ -192,8 +194,8 @@ private[spark] class MesosClusterScheduler(
       // 3. Check if it's in the retry list.
       // 4. Check if it has already completed.
       if (launchedDrivers.contains(submissionId)) {
-        val task = launchedDrivers(submissionId)
-        schedulerDriver.killTask(task.taskId)
+        val state = launchedDrivers(submissionId)
+        schedulerDriver.killTask(state.taskId)
         k.success = true
         k.message = "Killing running driver"
       } else if (removeFromQueuedDrivers(submissionId)) {
@@ -275,7 +277,7 @@ private[spark] class MesosClusterScheduler(
   private def recoverState(): Unit = {
     stateLock.synchronized {
       launchedDriversState.fetchAll[MesosClusterSubmissionState]().foreach { 
state =>
-        launchedDrivers(state.taskId.getValue) = state
+        launchedDrivers(state.driverDescription.submissionId) = state
         pendingRecover(state.taskId.getValue) = state.slaveId
       }
       queuedDriversState.fetchAll[MesosDriverDescription]().foreach(d => 
queuedDrivers += d)
@@ -353,7 +355,8 @@ private[spark] class MesosClusterScheduler(
               .setSlaveId(slaveId)
               .setState(MesosTaskState.TASK_STAGING)
               .build()
-            
launchedDrivers.get(taskId).map(_.mesosTaskStatus.getOrElse(newStatus))
+            launchedDrivers.get(getSubmissionIdFromTaskId(taskId))
+              .map(_.mesosTaskStatus.getOrElse(newStatus))
               .getOrElse(newStatus)
         }
         // TODO: Page the status updates to avoid trying to reconcile
@@ -369,10 +372,19 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def getDriverFrameworkID(desc: MesosDriverDescription): String = {
-    val retries = desc.retryState.map { d => s"-retry-${d.retries.toString}" 
}.getOrElse("")
+    val retries = desc.retryState.map { d => 
s"${RETRY_SEP}${d.retries.toString}" }.getOrElse("")
     s"${frameworkId}-${desc.submissionId}${retries}"
   }
 
+  private def getDriverTaskId(desc: MesosDriverDescription): String = {
+    val sId = desc.submissionId
+    desc.retryState.map(state => sId + 
s"${RETRY_SEP}${state.retries.toString}").getOrElse(sId)
+  }
+
+  private def getSubmissionIdFromTaskId(taskId: String): String = {
+    taskId.split(s"${RETRY_SEP}").head
+  }
+
   private def adjust[A, B](m: collection.Map[A, B], k: A, default: B)(f: B => 
B) = {
     m.updated(k, f(m.getOrElse(k, default)))
   }
@@ -551,7 +563,7 @@ private[spark] class MesosClusterScheduler(
   }
 
   private def createTaskInfo(desc: MesosDriverDescription, offer: 
ResourceOffer): TaskInfo = {
-    val taskId = TaskID.newBuilder().setValue(desc.submissionId).build()
+    val taskId = TaskID.newBuilder().setValue(getDriverTaskId(desc)).build()
 
     val (remainingResources, cpuResourcesToUse) =
       partitionResources(offer.remainingResources, "cpus", desc.cores)
@@ -604,7 +616,7 @@ private[spark] class MesosClusterScheduler(
           val task = createTaskInfo(submission, offer)
           queuedTasks += task
           logTrace(s"Using offer ${offer.offer.getId.getValue} to launch 
driver " +
-            submission.submissionId)
+            submission.submissionId + s" with taskId: 
${task.getTaskId.toString}")
           val newState = new MesosClusterSubmissionState(
             submission,
             task.getTaskId,
@@ -718,45 +730,51 @@ private[spark] class MesosClusterScheduler(
     logInfo(s"Received status update: taskId=${taskId}" +
       s" state=${status.getState}" +
       s" message=${status.getMessage}" +
-      s" reason=${status.getReason}");
+      s" reason=${status.getReason}")
 
     stateLock.synchronized {
-      if (launchedDrivers.contains(taskId)) {
+      val subId = getSubmissionIdFromTaskId(taskId)
+      if (launchedDrivers.contains(subId)) {
         if (status.getReason == Reason.REASON_RECONCILIATION &&
           !pendingRecover.contains(taskId)) {
           // Task has already received update and no longer requires 
reconciliation.
           return
         }
-        val state = launchedDrivers(taskId)
+        val state = launchedDrivers(subId)
         // Check if the driver is supervise enabled and can be relaunched.
         if (state.driverDescription.supervise && 
shouldRelaunch(status.getState)) {
-          removeFromLaunchedDrivers(taskId)
+          removeFromLaunchedDrivers(subId)
           state.finishDate = Some(new Date())
           val retryState: Option[MesosClusterRetryState] = 
state.driverDescription.retryState
           val (retries, waitTimeSec) = retryState
             .map { rs => (rs.retries + 1, Math.min(maxRetryWaitTime, 
rs.waitTime * 2)) }
             .getOrElse{ (1, 1) }
           val nextRetry = new Date(new Date().getTime + waitTimeSec * 1000L)
-
           val newDriverDescription = state.driverDescription.copy(
             retryState = Some(new MesosClusterRetryState(status, retries, 
nextRetry, waitTimeSec)))
-          addDriverToPending(newDriverDescription, taskId);
+          addDriverToPending(newDriverDescription, 
newDriverDescription.submissionId)
         } else if (TaskState.isFinished(mesosToTaskState(status.getState))) {
-          removeFromLaunchedDrivers(taskId)
-          state.finishDate = Some(new Date())
-          if (finishedDrivers.size >= retainedDrivers) {
-            val toRemove = math.max(retainedDrivers / 10, 1)
-            finishedDrivers.trimStart(toRemove)
-          }
-          finishedDrivers += state
+          retireDriver(subId, state)
         }
         state.mesosTaskStatus = Option(status)
       } else {
-        logError(s"Unable to find driver $taskId in status update")
+        logError(s"Unable to find driver with $taskId in status update")
       }
     }
   }
 
+  private def retireDriver(
+      submissionId: String,
+      state: MesosClusterSubmissionState) = {
+    removeFromLaunchedDrivers(submissionId)
+    state.finishDate = Some(new Date())
+    if (finishedDrivers.size >= retainedDrivers) {
+      val toRemove = math.max(retainedDrivers / 10, 1)
+      finishedDrivers.trimStart(toRemove)
+    }
+    finishedDrivers += state
+  }
+
   override def frameworkMessage(
       driver: SchedulerDriver,
       executorId: ExecutorID,
@@ -769,31 +787,31 @@ private[spark] class MesosClusterScheduler(
       slaveId: SlaveID,
       status: Int): Unit = {}
 
-  private def removeFromQueuedDrivers(id: String): Boolean = {
-    val index = queuedDrivers.indexWhere(_.submissionId.equals(id))
+  private def removeFromQueuedDrivers(subId: String): Boolean = {
+    val index = queuedDrivers.indexWhere(_.submissionId.equals(subId))
     if (index != -1) {
       queuedDrivers.remove(index)
-      queuedDriversState.expunge(id)
+      queuedDriversState.expunge(subId)
       true
     } else {
       false
     }
   }
 
-  private def removeFromLaunchedDrivers(id: String): Boolean = {
-    if (launchedDrivers.remove(id).isDefined) {
-      launchedDriversState.expunge(id)
+  private def removeFromLaunchedDrivers(subId: String): Boolean = {
+    if (launchedDrivers.remove(subId).isDefined) {
+      launchedDriversState.expunge(subId)
       true
     } else {
       false
     }
   }
 
-  private def removeFromPendingRetryDrivers(id: String): Boolean = {
-    val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(id))
+  private def removeFromPendingRetryDrivers(subId: String): Boolean = {
+    val index = pendingRetryDrivers.indexWhere(_.submissionId.equals(subId))
     if (index != -1) {
       pendingRetryDrivers.remove(index)
-      pendingRetryDriversState.expunge(id)
+      pendingRetryDriversState.expunge(subId)
       true
     } else {
       false
@@ -810,8 +828,8 @@ private[spark] class MesosClusterScheduler(
     revive()
   }
 
-  private def addDriverToPending(desc: MesosDriverDescription, taskId: String) 
= {
-    pendingRetryDriversState.persist(taskId, desc)
+  private def addDriverToPending(desc: MesosDriverDescription, subId: String) 
= {
+    pendingRetryDriversState.persist(subId, desc)
     pendingRetryDrivers += desc
     revive()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/b2463fad/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
index b8c780d..40a0b8e 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/Checkpoint.scala
@@ -58,7 +58,8 @@ class Checkpoint(ssc: StreamingContext, val checkpointTime: 
Time)
       "spark.yarn.credentials.file",
       "spark.yarn.credentials.renewalTime",
       "spark.yarn.credentials.updateTime",
-      "spark.ui.filters")
+      "spark.ui.filters",
+      "spark.mesos.driver.frameworkId")
 
     val newSparkConf = new SparkConf(loadDefaults = 
false).setAll(sparkConfPairs)
       .remove("spark.driver.host")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to