Repository: spark
Updated Branches:
  refs/heads/branch-1.5 16414dae0 -> a3ed2c31e


[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.

Currently the spark applications can be queued to the Mesos cluster dispatcher, 
but when multiple jobs are in queue we don't handle removing jobs from the 
buffer correctly while iterating and causes null pointer exception.

This patch copies the buffer before iterating them, so exceptions aren't thrown 
when the jobs are removed.

Author: Timothy Chen <tnac...@gmail.com>

Closes #8322 from tnachen/fix_cluster_mode.

(cherry picked from commit 73431d8afb41b93888d2642a1ce2d011f03fb740)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3ed2c31
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3ed2c31
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3ed2c31

Branch: refs/heads/branch-1.5
Commit: a3ed2c31e60b11c09f815b42c0cd894be3150c67
Parents: 16414da
Author: Timothy Chen <tnac...@gmail.com>
Authored: Wed Aug 19 19:43:26 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Aug 19 19:43:34 2015 -0700

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala    | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a3ed2c31/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 64ec2b8..1206f18 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
       val driversToRetry = pendingRetryDrivers.filter { d =>
         d.retryState.get.nextRetry.before(currentTime)
       }
+
       scheduleTasks(
-        driversToRetry,
+        copyBuffer(driversToRetry),
         removeFromPendingRetryDrivers,
         currentOffers,
         tasks)
+
       // Then we walk through the queued drivers and try to schedule them.
       scheduleTasks(
-        queuedDrivers,
+        copyBuffer(queuedDrivers),
         removeFromQueuedDrivers,
         currentOffers,
         tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
       .foreach(o => driver.declineOffer(o.getId))
   }
 
+  private def copyBuffer(
+      buffer: ArrayBuffer[MesosDriverDescription]): 
ArrayBuffer[MesosDriverDescription] = {
+    val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+    buffer.copyToBuffer(newBuffer)
+    newBuffer
+  }
+
   def getSchedulerState(): MesosClusterSchedulerState = {
-    def copyBuffer(
-        buffer: ArrayBuffer[MesosDriverDescription]): 
ArrayBuffer[MesosDriverDescription] = {
-      val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
-      buffer.copyToBuffer(newBuffer)
-      newBuffer
-    }
     stateLock.synchronized {
       new MesosClusterSchedulerState(
         frameworkId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to