Repository: spark Updated Branches: refs/heads/master cd273a238 -> f89cf65d7
SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler Author: Sandy Ryza <sa...@cloudera.com> Closes #634 from sryza/sandy-spark-1707 and squashes the following commits: 2f6e358 [Sandy Ryza] Default min registered executors ratio to .8 for YARN 354c630 [Sandy Ryza] Remove outdated comments c744ef3 [Sandy Ryza] Take out waitForInitialAllocations 2a4329b [Sandy Ryza] SPARK-1707. Remove unnecessary 3 second sleep in YarnClusterScheduler Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f89cf65d Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f89cf65d Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f89cf65d Branch: refs/heads/master Commit: f89cf65d7aced0bb387c05586f9f51cb29865022 Parents: cd273a2 Author: Sandy Ryza <sa...@cloudera.com> Authored: Mon Jul 21 13:15:46 2014 -0500 Committer: Thomas Graves <tgra...@apache.org> Committed: Mon Jul 21 13:15:46 2014 -0500 ---------------------------------------------------------------------- .../spark/deploy/yarn/ApplicationMaster.scala | 39 ------------------ .../cluster/YarnClientClusterScheduler.scala | 10 ----- .../cluster/YarnClientSchedulerBackend.scala | 5 +++ .../cluster/YarnClusterScheduler.scala | 8 +--- .../cluster/YarnClusterSchedulerBackend.scala | 5 +++ .../spark/deploy/yarn/ApplicationMaster.scala | 43 -------------------- 6 files changed, 11 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 062f946..3ec3648 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -255,10 +255,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkContext.getConf) } } - } finally { - // in case of exceptions, etc - ensure that count is atleast ALLOCATOR_LOOP_WAIT_COUNT : - // so that the loop (in ApplicationMaster.sparkContextInitialized) breaks - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } } @@ -277,13 +273,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } yarnAllocator.allocateContainers( math.max(args.numExecutors - yarnAllocator.getNumExecutorsRunning, 0)) - ApplicationMaster.incrementAllocatorLoop(1) Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) } - } finally { - // In case of exceptions, etc - ensure that count is at least ALLOCATOR_LOOP_WAIT_COUNT, - // so that the loop in ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.incrementAllocatorLoop(ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) } logInfo("All executors have launched.") @@ -411,24 +402,10 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } object ApplicationMaster extends Logging { - // Number of times to wait for the allocator loop to complete. - // Each loop iteration waits for 100ms, so maximum of 3 seconds. - // This is to ensure that we have reasonable number of containers before we start // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val ALLOCATOR_LOOP_WAIT_COUNT = 30 private val ALLOCATE_HEARTBEAT_INTERVAL = 100 - def incrementAllocatorLoop(by: Int) { - val count = yarnAllocatorLoop.getAndAdd(by) - if (count >= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.synchronized { - // to wake threads off wait ... - yarnAllocatorLoop.notifyAll() - } - } - } - private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() def register(master: ApplicationMaster) { @@ -437,7 +414,6 @@ object ApplicationMaster extends Logging { val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null /* initialValue */) - val yarnAllocatorLoop: AtomicInteger = new AtomicInteger(0) def sparkContextInitialized(sc: SparkContext): Boolean = { var modified = false @@ -472,21 +448,6 @@ object ApplicationMaster extends Logging { modified } - - /** - * Returns when we've either - * 1) received all the requested executors, - * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, - * 3) hit an error that causes us to terminate trying to get containers. - */ - def waitForInitialAllocations() { - yarnAllocatorLoop.synchronized { - while (yarnAllocatorLoop.get() <= ALLOCATOR_LOOP_WAIT_COUNT) { - yarnAllocatorLoop.wait(1000L) - } - } - } - def main(argStrings: Array[String]) { SignalLogger.register(log) val args = new ApplicationMasterArguments(argStrings) http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala index 15e8c21..3474112 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientClusterScheduler.scala @@ -37,14 +37,4 @@ private[spark] class YarnClientClusterScheduler(sc: SparkContext, conf: Configur val retval = YarnAllocationHandler.lookupRack(conf, host) if (retval != null) Some(retval) else None } - - override def postStartHook() { - - super.postStartHook() - // The yarn application is running, but the executor might not yet ready - // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt - // TODO It needn't after waitBackendReady - Thread.sleep(2000L) - logInfo("YarnClientClusterScheduler.postStartHook done") - } } http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 1b37c4b..d8266f7 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -30,6 +30,11 @@ private[spark] class YarnClientSchedulerBackend( extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) with Logging { + if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { + minRegisteredRatio = 0.8 + ready = false + } + var client: Client = null var appId: ApplicationId = null http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala index 9ee53d7..9aeca4a 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala @@ -47,14 +47,8 @@ private[spark] class YarnClusterScheduler(sc: SparkContext, conf: Configuration) } override def postStartHook() { - val sparkContextInitialized = ApplicationMaster.sparkContextInitialized(sc) + ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() - if (sparkContextInitialized){ - ApplicationMaster.waitForInitialAllocations() - // Wait for a few seconds for the slaves to bootstrap and register with master - best case attempt - // TODO It needn't after waitBackendReady - Thread.sleep(3000L) - } logInfo("YarnClusterScheduler.postStartHook done") } } http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index a04b08f..0ad1794 100644 --- a/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/yarn/common/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -27,6 +27,11 @@ private[spark] class YarnClusterSchedulerBackend( sc: SparkContext) extends CoarseGrainedSchedulerBackend(scheduler, sc.env.actorSystem) { + if (conf.getOption("spark.scheduler.minRegisteredExecutorsRatio").isEmpty) { + minRegisteredRatio = 0.8 + ready = false + } + override def start() { super.start() var numExecutors = ApplicationMasterArguments.DEFAULT_NUMBER_EXECUTORS http://git-wip-us.apache.org/repos/asf/spark/blob/f89cf65d/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala ---------------------------------------------------------------------- diff --git a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 1a24ec7..eaf594c 100644 --- a/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/yarn/stable/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -234,10 +234,6 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, sparkContext.getConf) } } - } finally { - // In case of exceptions, etc - ensure that the loop in - // ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.doneWithInitialAllocations() } } @@ -254,16 +250,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, checkNumExecutorsFailed() allocateMissingExecutor() yarnAllocator.allocateResources() - if (iters == ApplicationMaster.ALLOCATOR_LOOP_WAIT_COUNT) { - ApplicationMaster.doneWithInitialAllocations() - } Thread.sleep(ApplicationMaster.ALLOCATE_HEARTBEAT_INTERVAL) iters += 1 } - } finally { - // In case of exceptions, etc - ensure that the loop in - // ApplicationMaster#sparkContextInitialized() breaks. - ApplicationMaster.doneWithInitialAllocations() } logInfo("All executors have launched.") } @@ -365,12 +354,8 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration, } object ApplicationMaster extends Logging { - // Number of times to wait for the allocator loop to complete. - // Each loop iteration waits for 100ms, so maximum of 3 seconds. - // This is to ensure that we have reasonable number of containers before we start // TODO: Currently, task to container is computed once (TaskSetManager) - which need not be // optimal as more containers are available. Might need to handle this better. - private val ALLOCATOR_LOOP_WAIT_COUNT = 30 private val ALLOCATE_HEARTBEAT_INTERVAL = 100 private val applicationMasters = new CopyOnWriteArrayList[ApplicationMaster]() @@ -378,20 +363,6 @@ object ApplicationMaster extends Logging { val sparkContextRef: AtomicReference[SparkContext] = new AtomicReference[SparkContext](null) - // Variable used to notify the YarnClusterScheduler that it should stop waiting - // for the initial set of executors to be started and get on with its business. - val doneWithInitialAllocationsMonitor = new Object() - - @volatile var isDoneWithInitialAllocations = false - - def doneWithInitialAllocations() { - isDoneWithInitialAllocations = true - doneWithInitialAllocationsMonitor.synchronized { - // to wake threads off wait ... - doneWithInitialAllocationsMonitor.notifyAll() - } - } - def register(master: ApplicationMaster) { applicationMasters.add(master) } @@ -434,20 +405,6 @@ object ApplicationMaster extends Logging { modified } - /** - * Returns when we've either - * 1) received all the requested executors, - * 2) waited ALLOCATOR_LOOP_WAIT_COUNT * ALLOCATE_HEARTBEAT_INTERVAL ms, - * 3) hit an error that causes us to terminate trying to get containers. - */ - def waitForInitialAllocations() { - doneWithInitialAllocationsMonitor.synchronized { - while (!isDoneWithInitialAllocations) { - doneWithInitialAllocationsMonitor.wait(1000L) - } - } - } - def getApplicationAttemptId(): ApplicationAttemptId = { val containerIdString = System.getenv(ApplicationConstants.Environment.CONTAINER_ID.name()) val containerId = ConverterUtils.toContainerId(containerIdString)