Repository: spark
Updated Branches:
  refs/heads/master 3394b12c3 -> af1f4da76


[SPARK-13904][SCHEDULER] Add support for pluggable cluster manager

## What changes were proposed in this pull request?

This commit adds support for pluggable cluster manager. And also allows a 
cluster manager to clean up tasks without taking the parent process down.

To plug a new external cluster manager, ExternalClusterManager trait should be 
implemented. It returns task scheduler and backend scheduler that will be used 
by SparkContext to schedule tasks. An external cluster manager is registered 
using the java.util.ServiceLoader mechanism (This mechanism is also being used 
to register data sources like parquet, json, jdbc etc.). This allows 
auto-loading implementations of ExternalClusterManager interface.

Currently, when a driver fails, executors exit using system.exit. This does not 
bode well for cluster managers that would like to reuse the parent process of 
an executor. Hence,

  1. Moving system.exit to a function that can be overriden in subclasses of 
CoarseGrainedExecutorBackend.
  2. Added functionality of killing all the running tasks in an executor.

## How was this patch tested?
ExternalClusterManagerSuite.scala was added to test this patch.

Author: Hemant Bhanawat <hem...@snappydata.io>

Closes #11723 from hbhanawat/pluggableScheduler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af1f4da7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af1f4da7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af1f4da7

Branch: refs/heads/master
Commit: af1f4da76268115c5a4cc3035d3236ad27f7240a
Parents: 3394b12
Author: Hemant Bhanawat <hem...@snappydata.io>
Authored: Sat Apr 16 23:43:32 2016 -0700
Committer: Reynold Xin <r...@databricks.com>
Committed: Sat Apr 16 23:43:32 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 29 +++++++-
 .../executor/CoarseGrainedExecutorBackend.scala | 17 +++--
 .../org/apache/spark/executor/Executor.scala    | 15 ++++
 .../scheduler/ExternalClusterManager.scala      | 65 +++++++++++++++++
 ...pache.spark.scheduler.ExternalClusterManager |  1 +
 .../scheduler/ExternalClusterManagerSuite.scala | 73 ++++++++++++++++++++
 dev/.rat-excludes                               |  1 +
 7 files changed, 193 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index e41088f..e7eabd2 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -20,7 +20,7 @@ package org.apache.spark
 import java.io._
 import java.lang.reflect.Constructor
 import java.net.URI
-import java.util.{Arrays, Properties, UUID}
+import java.util.{Arrays, Properties, ServiceLoader, UUID}
 import java.util.concurrent.ConcurrentMap
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, 
AtomicReference}
 
@@ -2453,9 +2453,32 @@ object SparkContext extends Logging {
           "in the form mesos://zk://host:port. Current Master URL will stop 
working in Spark 2.0.")
         createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
 
-      case _ =>
-        throw new SparkException("Could not parse Master URL: '" + master + 
"'")
+      case masterUrl =>
+        val cm = getClusterManager(masterUrl) match {
+          case Some(clusterMgr) => clusterMgr
+          case None => throw new SparkException("Could not parse Master URL: 
'" + master + "'")
+        }
+        try {
+          val scheduler = cm.createTaskScheduler(sc, masterUrl)
+          val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
+          cm.initialize(scheduler, backend)
+          (backend, scheduler)
+        } catch {
+          case NonFatal(e) =>
+            throw new SparkException("External scheduler cannot be 
instantiated", e)
+        }
+    }
+  }
+
+  private def getClusterManager(url: String): Option[ExternalClusterManager] = 
{
+    val loader = Utils.getContextOrSparkClassLoader
+    val serviceLoaders =
+    ServiceLoader.load(classOf[ExternalClusterManager], 
loader).asScala.filter(_.canCreate(url))
+    if (serviceLoaders.size > 1) {
+      throw new SparkException(s"Multiple Cluster Managers ($serviceLoaders) 
registered " +
+          s"for the url $url:")
     }
+    serviceLoaders.headOption
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 71b4ad1..db5b774 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -64,7 +64,7 @@ private[spark] class CoarseGrainedExecutorBackend(
         // Always receive `true`. Just ignore it
       case Failure(e) =>
         logError(s"Cannot register with driver: $driverUrl", e)
-        System.exit(1)
+        exitExecutor()
     }(ThreadUtils.sameThread)
   }
 
@@ -81,12 +81,12 @@ private[spark] class CoarseGrainedExecutorBackend(
 
     case RegisterExecutorFailed(message) =>
       logError("Slave registration failed: " + message)
-      System.exit(1)
+      exitExecutor()
 
     case LaunchTask(data) =>
       if (executor == null) {
         logError("Received LaunchTask command but executor was null")
-        System.exit(1)
+        exitExecutor()
       } else {
         val taskDesc = ser.deserialize[TaskDescription](data.value)
         logInfo("Got assigned task " + taskDesc.taskId)
@@ -97,7 +97,7 @@ private[spark] class CoarseGrainedExecutorBackend(
     case KillTask(taskId, _, interruptThread) =>
       if (executor == null) {
         logError("Received KillTask command but executor was null")
-        System.exit(1)
+        exitExecutor()
       } else {
         executor.killTask(taskId, interruptThread)
       }
@@ -127,7 +127,7 @@ private[spark] class CoarseGrainedExecutorBackend(
       logInfo(s"Driver from $remoteAddress disconnected during shutdown")
     } else if (driver.exists(_.address == remoteAddress)) {
       logError(s"Driver $remoteAddress disassociated! Shutting down.")
-      System.exit(1)
+      exitExecutor()
     } else {
       logWarning(s"An unknown ($remoteAddress) driver disconnected.")
     }
@@ -140,6 +140,13 @@ private[spark] class CoarseGrainedExecutorBackend(
       case None => logWarning(s"Drop $msg because has not yet connected to 
driver")
     }
   }
+
+  /**
+   * This function can be overloaded by other child classes to handle
+   * executor exits differently. For e.g. when an executor goes down,
+   * back-end may not want to take the parent process down.
+   */
+  protected def exitExecutor(): Unit = System.exit(1)
 }
 
 private[spark] object CoarseGrainedExecutorBackend extends Logging {

http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9f94fde..b20bd11 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -153,6 +153,21 @@ private[spark] class Executor(
     }
   }
 
+  /**
+   * Function to kill the running tasks in an executor.
+   * This can be called by executor back-ends to kill the
+   * tasks instead of taking the JVM down.
+   * @param interruptThread whether to interrupt the task thread
+   */
+  def killAllTasks(interruptThread: Boolean) : Unit = {
+    // kill all the running tasks
+    for (taskRunner <- runningTasks.values().asScala) {
+      if (taskRunner != null) {
+        taskRunner.kill(interruptThread)
+      }
+    }
+  }
+
   def stop(): Unit = {
     env.metricsSystem.report()
     heartbeater.shutdown()

http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
new file mode 100644
index 0000000..6ca1f56
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/ExternalClusterManager.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.SparkContext
+import org.apache.spark.annotation.DeveloperApi
+
+/**
+ * :: DeveloperApi ::
+ * A cluster manager interface to plugin external scheduler.
+ */
+@DeveloperApi
+trait ExternalClusterManager {
+
+  /**
+   * Check if this cluster manager instance can create scheduler components
+   * for a certain master URL.
+   * @param masterURL the master URL
+   * @return True if the cluster manager can create scheduler backend/
+   */
+  def canCreate(masterURL: String): Boolean
+
+  /**
+   * Create a task scheduler instance for the given SparkContext
+   * @param sc SparkContext
+   * @param masterURL the master URL
+   * @return TaskScheduler that will be responsible for task handling
+   */
+  def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler
+
+  /**
+   * Create a scheduler backend for the given SparkContext and scheduler. This 
is
+   * called after task scheduler is created using 
[[ExternalClusterManager.createTaskScheduler()]].
+   * @param sc SparkContext
+   * @param masterURL the master URL
+   * @param scheduler TaskScheduler that will be used with the scheduler 
backend.
+   * @return SchedulerBackend that works with a TaskScheduler
+   */
+  def createSchedulerBackend(sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend
+
+  /**
+   * Initialize task scheduler and backend scheduler. This is called after the
+   * scheduler components are created
+   * @param scheduler TaskScheduler that will be responsible for task handling
+   * @param backend SchedulerBackend that works with a TaskScheduler
+   */
+  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
----------------------------------------------------------------------
diff --git 
a/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
 
b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
new file mode 100644
index 0000000..3c570ff
--- /dev/null
+++ 
b/core/src/test/resources/META-INF/services/org.apache.spark.scheduler.ExternalClusterManager
@@ -0,0 +1 @@
+org.apache.spark.scheduler.DummyExternalClusterManager
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
new file mode 100644
index 0000000..9971d48
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExternalClusterManagerSuite.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.scheduler
+
+import org.apache.spark.{LocalSparkContext, SparkConf, SparkContext, 
SparkFunSuite}
+import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
+import org.apache.spark.storage.BlockManagerId
+
+class ExternalClusterManagerSuite extends SparkFunSuite with LocalSparkContext
+{
+  test("launch of backend and scheduler") {
+    val conf = new SparkConf().setMaster("myclusterManager").
+        setAppName("testcm").set("spark.driver.allowMultipleContexts", "true")
+    sc = new SparkContext(conf)
+    // check if the scheduler components are created
+    assert(sc.schedulerBackend.isInstanceOf[DummySchedulerBackend])
+    assert(sc.taskScheduler.isInstanceOf[DummyTaskScheduler])
+  }
+}
+
+private class DummyExternalClusterManager extends ExternalClusterManager {
+
+  def canCreate(masterURL: String): Boolean = masterURL == "myclusterManager"
+
+  def createTaskScheduler(sc: SparkContext,
+      masterURL: String): TaskScheduler = new DummyTaskScheduler
+
+  def createSchedulerBackend(sc: SparkContext,
+      masterURL: String,
+      scheduler: TaskScheduler): SchedulerBackend = new DummySchedulerBackend()
+
+  def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = 
{}
+
+}
+
+private class DummySchedulerBackend extends SchedulerBackend {
+  def start() {}
+  def stop() {}
+  def reviveOffers() {}
+  def defaultParallelism(): Int = 1
+}
+
+private class DummyTaskScheduler extends TaskScheduler {
+  override def rootPool: Pool = null
+  override def schedulingMode: SchedulingMode = SchedulingMode.NONE
+  override def start(): Unit = {}
+  override def stop(): Unit = {}
+  override def submitTasks(taskSet: TaskSet): Unit = {}
+  override def cancelTasks(stageId: Int, interruptThread: Boolean): Unit = {}
+  override def setDAGScheduler(dagScheduler: DAGScheduler): Unit = {}
+  override def defaultParallelism(): Int = 2
+  override def executorLost(executorId: String, reason: ExecutorLossReason): 
Unit = {}
+  override def applicationAttemptId(): Option[String] = None
+  def executorHeartbeatReceived(
+      execId: String,
+      accumUpdates: Array[(Long, Seq[AccumulableInfo])],
+      blockManagerId: BlockManagerId): Boolean = true
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/af1f4da7/dev/.rat-excludes
----------------------------------------------------------------------
diff --git a/dev/.rat-excludes b/dev/.rat-excludes
index 8b50614..a409a3c 100644
--- a/dev/.rat-excludes
+++ b/dev/.rat-excludes
@@ -98,3 +98,4 @@ LZ4BlockInputStream.java
 spark-deps-.*
 .*csv
 .*tsv
+org.apache.spark.scheduler.ExternalClusterManager


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to