Repository: spark
Updated Branches:
  refs/heads/master affc8a887 -> 73431d8af


[SPARK-10124] [MESOS] Fix removing queued driver in mesos cluster mode.

Currently the spark applications can be queued to the Mesos cluster dispatcher, 
but when multiple jobs are in queue we don't handle removing jobs from the 
buffer correctly while iterating and causes null pointer exception.

This patch copies the buffer before iterating them, so exceptions aren't thrown 
when the jobs are removed.

Author: Timothy Chen <tnac...@gmail.com>

Closes #8322 from tnachen/fix_cluster_mode.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/73431d8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/73431d8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/73431d8a

Branch: refs/heads/master
Commit: 73431d8afb41b93888d2642a1ce2d011f03fb740
Parents: affc8a8
Author: Timothy Chen <tnac...@gmail.com>
Authored: Wed Aug 19 19:43:26 2015 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Wed Aug 19 19:43:26 2015 -0700

----------------------------------------------------------------------
 .../cluster/mesos/MesosClusterScheduler.scala    | 19 +++++++++++--------
 1 file changed, 11 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/73431d8a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 64ec2b8..1206f18 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -507,14 +507,16 @@ private[spark] class MesosClusterScheduler(
       val driversToRetry = pendingRetryDrivers.filter { d =>
         d.retryState.get.nextRetry.before(currentTime)
       }
+
       scheduleTasks(
-        driversToRetry,
+        copyBuffer(driversToRetry),
         removeFromPendingRetryDrivers,
         currentOffers,
         tasks)
+
       // Then we walk through the queued drivers and try to schedule them.
       scheduleTasks(
-        queuedDrivers,
+        copyBuffer(queuedDrivers),
         removeFromQueuedDrivers,
         currentOffers,
         tasks)
@@ -527,13 +529,14 @@ private[spark] class MesosClusterScheduler(
       .foreach(o => driver.declineOffer(o.getId))
   }
 
+  private def copyBuffer(
+      buffer: ArrayBuffer[MesosDriverDescription]): 
ArrayBuffer[MesosDriverDescription] = {
+    val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
+    buffer.copyToBuffer(newBuffer)
+    newBuffer
+  }
+
   def getSchedulerState(): MesosClusterSchedulerState = {
-    def copyBuffer(
-        buffer: ArrayBuffer[MesosDriverDescription]): 
ArrayBuffer[MesosDriverDescription] = {
-      val newBuffer = new ArrayBuffer[MesosDriverDescription](buffer.size)
-      buffer.copyToBuffer(newBuffer)
-      newBuffer
-    }
     stateLock.synchronized {
       new MesosClusterSchedulerState(
         frameworkId,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to