Repository: spark Updated Branches: refs/heads/master d60b09bb6 -> 3dd8af7a6
[SPARK-1946] Submit tasks after (configured ratio) executors have been registered Because submitting tasks and registering executors are asynchronous, in most situation, early stages' tasks run without preferred locality. A simple solution is sleeping few seconds in application, so that executors have enough time to register. The PR add 2 configuration properties to make TaskScheduler submit tasks after a few of executors have been registered. \# Submit tasks only after (registered executors / total executors) arrived the ratio, default value is 0 spark.scheduler.minRegisteredExecutorsRatio = 0.8 \# Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the maxRegisteredWaitingTime(millisecond), default value is 30000 spark.scheduler.maxRegisteredExecutorsWaitingTime = 5000 Author: li-zhihui <zhihui...@intel.com> Closes #900 from li-zhihui/master and squashes the following commits: b9f8326 [li-zhihui] Add logs & edit docs 1ac08b1 [li-zhihui] Add new configs to user docs 22ead12 [li-zhihui] Move waitBackendReady to postStartHook c6f0522 [li-zhihui] Bug fix: numExecutors wasn't set & use constant DEFAULT_NUMBER_EXECUTORS 4d6d847 [li-zhihui] Move waitBackendReady to TaskSchedulerImpl.start & some code refactor 0ecee9a [li-zhihui] Move waitBackendReady from DAGScheduler.submitStage to TaskSchedulerImpl.submitTasks 4261454 [li-zhihui] Add docs for new configs & code style ce0868a [li-zhihui] Code style, rename configuration property name of minRegisteredRatio & maxRegisteredWaitingTime 6cfb9ec [li-zhihui] Code style, revert default minRegisteredRatio of yarn to 0, driver get --num-executors in yarn/alpha 812c33c [li-zhihui] Fix driver lost --num-executors option in yarn-cluster mode e7b6272 [li-zhihui] support yarn-cluster 37f7dc2 [li-zhihui] support yarn mode(percentage style) 3f8c941 [li-zhihui] submit stage after (configured ratio of) executors have been registered Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dd8af7a Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dd8af7a Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dd8af7a Branch: refs/heads/master Commit: 3dd8af7a6623201c28231f4b71f59ea4e9ae29bf Parents: d60b09b Author: li-zhihui <zhihui...@intel.com> Authored: Mon Jul 14 15:32:49 2014 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Mon Jul 14 15:32:49 2014 -0500 ---------------------------------------------------------------------- .../scala/org/apache/spark/SparkContext.scala | 11 +++++- .../spark/scheduler/SchedulerBackend.scala | 1 + .../spark/scheduler/TaskSchedulerImpl.scala | 15 ++++++++ .../cluster/CoarseGrainedSchedulerBackend.scala | 29 ++++++++++++++ .../cluster/SparkDeploySchedulerBackend.scala | 1 + docs/configuration.md | 19 ++++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 1 + .../yarn/ApplicationMasterArguments.scala | 6 ++- .../cluster/YarnClientClusterScheduler.scala | 2 + .../cluster/YarnClientSchedulerBackend.scala | 1 + .../cluster/YarnClusterScheduler.scala | 2 + .../cluster/YarnClusterSchedulerBackend.scala | 40 ++++++++++++++++++++ .../spark/deploy/yarn/ApplicationMaster.scala | 1 + 13 files changed, 127 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 8819e73..8052499 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -1531,7 +1531,16 @@ object SparkContext extends Logging { throw new SparkException("YARN mode not available ?", e) } } - val backend = new CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) + val backend = try { + val clazz = + Class.forName("org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend") + val cons = clazz.getConstructor(classOf[TaskSchedulerImpl], classOf[SparkContext]) + cons.newInstance(scheduler, sc).asInstanceOf[CoarseGrainedSchedulerBackend] + } catch { + case e: Exception => { + throw new SparkException("YARN mode not available ?", e) + } + } scheduler.initialize(backend) scheduler http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala index 6a6d8e6..e41e0a9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulerBackend.scala @@ -30,4 +30,5 @@ private[spark] trait SchedulerBackend { def killTask(taskId: Long, executorId: String, interruptThread: Boolean): Unit = throw new UnsupportedOperationException + def isReady(): Boolean = true } http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 5ed2803..4b6d6da 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -145,6 +145,10 @@ private[spark] class TaskSchedulerImpl( } } + override def postStartHook() { + waitBackendReady() + } + override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasks logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks") @@ -437,6 +441,17 @@ private[spark] class TaskSchedulerImpl( // By default, rack is unknown def getRackForHost(value: String): Option[String] = None + + private def waitBackendReady(): Unit = { + if (backend.isReady) { + return + } + while (!backend.isReady) { + synchronized { + this.wait(100) + } + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 05d01b0..0f5545e 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -46,9 +46,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed var totalCoreCount = new AtomicInteger(0) + var totalExpectedExecutors = new AtomicInteger(0) val conf = scheduler.sc.conf private val timeout = AkkaUtils.askTimeout(conf) private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf) + // Submit tasks only after (registered executors / total expected executors) + // is equal to at least this value, that is double between 0 and 1. + var minRegisteredRatio = conf.getDouble("spark.scheduler.minRegisteredExecutorsRatio", 0) + if (minRegisteredRatio > 1) minRegisteredRatio = 1 + // Whatever minRegisteredExecutorsRatio is arrived, submit tasks after the time(milliseconds). + val maxRegisteredWaitingTime = + conf.getInt("spark.scheduler.maxRegisteredExecutorsWaitingTime", 30000) + val createTime = System.currentTimeMillis() + var ready = if (minRegisteredRatio <= 0) true else false class DriverActor(sparkProperties: Seq[(String, String)]) extends Actor { private val executorActor = new HashMap[String, ActorRef] @@ -83,6 +93,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A executorAddress(executorId) = sender.path.address addressToExecutorId(sender.path.address) = executorId totalCoreCount.addAndGet(cores) + if (executorActor.size >= totalExpectedExecutors.get() * minRegisteredRatio && !ready) { + ready = true + logInfo("SchedulerBackend is ready for scheduling beginning, registered executors: " + + executorActor.size + ", total expected executors: " + totalExpectedExecutors.get() + + ", minRegisteredExecutorsRatio: " + minRegisteredRatio) + } makeOffers() } @@ -247,6 +263,19 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A throw new SparkException("Error notifying standalone scheduler's driver actor", e) } } + + override def isReady(): Boolean = { + if (ready) { + return true + } + if ((System.currentTimeMillis() - createTime) >= maxRegisteredWaitingTime) { + ready = true + logInfo("SchedulerBackend is ready for scheduling beginning after waiting " + + "maxRegisteredExecutorsWaitingTime: " + maxRegisteredWaitingTime) + return true + } + false + } } private[spark] object CoarseGrainedSchedulerBackend { http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala index 9c07b3f..bf2dc88 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala @@ -95,6 +95,7 @@ private[spark] class SparkDeploySchedulerBackend( override def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int) { + totalExpectedExecutors.addAndGet(1) logInfo("Granted executor ID %s on hostPort %s with %d cores, %s RAM".format( fullId, hostPort, cores, Utils.megabytesToString(memory))) } http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/docs/configuration.md ---------------------------------------------------------------------- diff --git a/docs/configuration.md b/docs/configuration.md index 0aea23a..07aa4c0 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -699,6 +699,25 @@ Apart from these, the following properties are also available, and may be useful (in milliseconds) </td> </tr> +</tr> + <td><code>spark.scheduler.minRegisteredExecutorsRatio</code></td> + <td>0</td> + <td> + The minimum ratio of registered executors (registered executors / total expected executors) + to wait for before scheduling begins. Specified as a double between 0 and 1. + Regardless of whether the minimum ratio of executors has been reached, + the maximum amount of time it will wait before scheduling begins is controlled by config + <code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code> + </td> +</tr> +<tr> + <td><code>spark.scheduler.maxRegisteredExecutorsWaitingTime</code></td> + <td>30000</td> + <td> + Maximum amount of time to wait for executors to register before scheduling begins + (in milliseconds). + </td> +</tr> </table> #### Security http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 438737f..062f946 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -184,6 +184,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false /* initialize */ , http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala index 25cc901..4c383ab 100644 --- a/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala +++ b/yarn/common/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMasterArguments.scala @@ -26,7 +26,7 @@ class ApplicationMasterArguments(val args: Array[String]) { var userArgs: Seq[String] = Seq[String]() var executorMemory = 1024 var executorCores = 1 - var numExecutors = 2 + var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS parseArgs(args.toList) @@ -93,3 +93,7 @@ class ApplicationMasterArguments(val args: Array[String]) { System.exit(exitCode) } } + +object ApplicationMasterArguments { + val DEFAULT_NUMBER_EXECUTORS = 2 +} http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 6b91e6b..15e8c21 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -40,8 +40,10 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur override def postStartHook() { + super.postStartHook() // The yarn application is running, but the executor might not yet ready // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt + // TODO It needn't after waitBackendReady Thread.sleep(2000L) logInfo("YarnClientClusterScheduler.postStartHook done") } http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index fd2694f..0f9fdcf 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -75,6 +75,7 @@ private[spark] class YarnClientSchedulerBackend( logDebug("ClientArguments called with: " + argsArrayBuf) val args = new ClientArguments(argsArrayBuf.toArray, conf) + totalExpectedExecutors.set(args.numExecutors) client = new Client(args, conf) appId = client.runApp() waitForApp() http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 39cdd2e..9ee53d7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -48,9 +48,11 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) override def postStartHook() { val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc) + super.postStartHook() if (sparkContextInitialized){ ApplicationMaster.waitForInitialAllocations() // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt + // TODO It needn't after waitBackendReady Thread.sleep(3000L) } logInfo("YarnClusterScheduler.postStartHook done") http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala new file mode 100644 index 0000000..a04b08f --- /dev/null +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler.cluster + +import org.apache.spark.SparkContext +import org.apache.spark.deploy.yarn.ApplicationMasterArguments +import org.apache.spark.scheduler.TaskSchedulerImpl +import org.apache.spark.util.IntParam + +private[spark] class YarnClusterSchedulerBackend( + scheduler: TaskSchedulerImpl, + sc: SparkContext) + extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + + override def start() { + super.start() + var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS + if (System.getenv("SPARK_EXECUTOR_INSTANCES") != null) { + numExecutors = IntParam.unapply(System.getenv("SPARK_EXECUTOR_INSTANCES")).getOrElse(numExecutors) + } + // System property can override environment variable. + numExecutors = sc.getConf.getInt("spark.executor.instances", numExecutors) + totalExpectedExecutors.set(numExecutors) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/3dd8af7a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index ee1e9c9..1a24ec7 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -164,6 +164,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, private def startUserClass(): Thread = { logInfo("Starting the user JAR in a separate Thread") + System.setProperty("spark.executor.instances", args.numExecutors.toString) val mainMethod = Class.forName( args.userClass, false,