This is an automated email from the ASF dual-hosted git repository. vanzin pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ec5e342 [SPARK-27094][YARN] Work around RackResolver swallowing thread interrupt. ec5e342 is described below commit ec5e34205a8b0e2f6bc4287b86e7eac269452ffb Author: Marcelo Vanzin <van...@cloudera.com> AuthorDate: Wed Mar 20 11:48:06 2019 -0700 [SPARK-27094][YARN] Work around RackResolver swallowing thread interrupt. To avoid the case where the YARN libraries would swallow the exception and prevent YarnAllocator from shutting down, call the offending code in a separate thread, so that the parent thread can respond appropriately to the shut down. As a safeguard, also explicitly stop the executor launch thread pool when shutting down the application, to prevent new executors from coming up after the application started its shutdown. Tested with unit tests + some internal tests on real cluster. Closes #24017 from vanzin/SPARK-27094. Authored-by: Marcelo Vanzin <van...@cloudera.com> Signed-off-by: Marcelo Vanzin <van...@cloudera.com> --- .../spark/deploy/yarn/ApplicationMaster.scala | 154 +++++++++++---------- .../apache/spark/deploy/yarn/YarnAllocator.scala | 45 +++++- 2 files changed, 120 insertions(+), 79 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 9ed3b78..743c2e0 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -550,88 +550,94 @@ private[spark] class ApplicationMaster( reporterThread.join() } - private def launchReporterThread(): Thread = { - // The number of failures in a row until Reporter thread give up + private def allocationThreadImpl(): Unit = { + // The number of failures in a row until the allocation thread gives up. val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) - - val t = new Thread { - override def run() { - var failureCount = 0 - while (!finished) { - try { - if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, - s"Max number of executor failures ($maxNumExecutorFailures) reached") - } else if (allocator.isAllNodeBlacklisted) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, - "Due to executor failures all available nodes are blacklisted") - } else { - logDebug("Sending progress") - allocator.allocateResources() - } - failureCount = 0 - } catch { - case i: InterruptedException => // do nothing - case e: ApplicationAttemptNotFoundException => - failureCount += 1 - logError("Exception from Reporter thread.", e) - finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, - e.getMessage) - case e: Throwable => - failureCount += 1 - if (!NonFatal(e) || failureCount >= reporterMaxFailures) { - finish(FinalApplicationStatus.FAILED, - ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + - s"$failureCount time(s) from Reporter thread.") - } else { - logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) - } + var failureCount = 0 + while (!finished) { + try { + if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, + s"Max number of executor failures ($maxNumExecutorFailures) reached") + } else if (allocator.isAllNodeBlacklisted) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, + "Due to executor failures all available nodes are blacklisted") + } else { + logDebug("Sending progress") + allocator.allocateResources() + } + failureCount = 0 + } catch { + case i: InterruptedException => // do nothing + case e: ApplicationAttemptNotFoundException => + failureCount += 1 + logError("Exception from Reporter thread.", e) + finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE, + e.getMessage) + case e: Throwable => + failureCount += 1 + if (!NonFatal(e) || failureCount >= reporterMaxFailures) { + finish(FinalApplicationStatus.FAILED, + ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " + + s"$failureCount time(s) from Reporter thread.") + } else { + logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e) } - try { - val numPendingAllocate = allocator.getPendingAllocate.size - var sleepStartNs = 0L - var sleepInterval = 200L // ms - allocatorLock.synchronized { - sleepInterval = - if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { - val currentAllocationInterval = - math.min(heartbeatInterval, nextAllocationInterval) - nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow - currentAllocationInterval - } else { - nextAllocationInterval = initialAllocationInterval - heartbeatInterval - } - sleepStartNs = System.nanoTime() - allocatorLock.wait(sleepInterval) - } - val sleepDuration = System.nanoTime() - sleepStartNs - if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) { - // log when sleep is interrupted - logDebug(s"Number of pending allocations is $numPendingAllocate. " + - s"Slept for $sleepDuration/$sleepInterval ms.") - // if sleep was less than the minimum interval, sleep for the rest of it - val toSleep = math.max(0, initialAllocationInterval - sleepDuration) - if (toSleep > 0) { - logDebug(s"Going back to sleep for $toSleep ms") - // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up - // by the methods that signal allocatorLock because this is just finishing the min - // sleep interval, which should happen even if this is signalled again. - Thread.sleep(toSleep) - } + } + try { + val numPendingAllocate = allocator.getPendingAllocate.size + var sleepStartNs = 0L + var sleepInterval = 200L // ms + allocatorLock.synchronized { + sleepInterval = + if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) { + val currentAllocationInterval = + math.min(heartbeatInterval, nextAllocationInterval) + nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow + currentAllocationInterval } else { - logDebug(s"Number of pending allocations is $numPendingAllocate. " + - s"Slept for $sleepDuration/$sleepInterval.") + nextAllocationInterval = initialAllocationInterval + heartbeatInterval } - } catch { - case e: InterruptedException => + sleepStartNs = System.nanoTime() + allocatorLock.wait(sleepInterval) + } + val sleepDuration = System.nanoTime() - sleepStartNs + if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) { + // log when sleep is interrupted + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Slept for $sleepDuration/$sleepInterval ms.") + // if sleep was less than the minimum interval, sleep for the rest of it + val toSleep = math.max(0, initialAllocationInterval - sleepDuration) + if (toSleep > 0) { + logDebug(s"Going back to sleep for $toSleep ms") + // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up + // by the methods that signal allocatorLock because this is just finishing the min + // sleep interval, which should happen even if this is signalled again. + Thread.sleep(toSleep) } + } else { + logDebug(s"Number of pending allocations is $numPendingAllocate. " + + s"Slept for $sleepDuration/$sleepInterval.") + } + } catch { + case e: InterruptedException => + } + } + } + + private def launchReporterThread(): Thread = { + val t = new Thread { + override def run(): Unit = { + try { + allocationThreadImpl() + } finally { + allocator.stop() } } } - // setting to daemon status, though this is usually not a good idea. t.setDaemon(true) t.setName("Reporter") t.start() diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 8c6eff99..f9bdddc 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -381,6 +381,13 @@ private[yarn] class YarnAllocator( } } + def stop(): Unit = { + // Forcefully shut down the launcher pool, in case this is being called in the middle of + // container allocation. This will prevent queued executors from being started - and + // potentially interrupt active ExecutorRunnable instaces too. + launcherPool.shutdownNow() + } + private def hostStr(request: ContainerRequest): String = { Option(request.getNodes) match { case Some(nodes) => nodes.asScala.mkString(",") @@ -417,12 +424,40 @@ private[yarn] class YarnAllocator( containersToUse, remainingAfterHostMatches) } - // Match remaining by rack + // Match remaining by rack. Because YARN's RackResolver swallows thread interrupts + // (see SPARK-27094), which can cause this code to miss interrupts from the AM, use + // a separate thread to perform the operation. val remainingAfterRackMatches = new ArrayBuffer[Container] - for (allocatedContainer <- remainingAfterHostMatches) { - val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost) - matchContainerToRequest(allocatedContainer, rack, containersToUse, - remainingAfterRackMatches) + if (remainingAfterHostMatches.nonEmpty) { + var exception: Option[Throwable] = None + val thread = new Thread("spark-rack-resolver") { + override def run(): Unit = { + try { + for (allocatedContainer <- remainingAfterHostMatches) { + val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost) + matchContainerToRequest(allocatedContainer, rack, containersToUse, + remainingAfterRackMatches) + } + } catch { + case e: Throwable => + exception = Some(e) + } + } + } + thread.setDaemon(true) + thread.start() + + try { + thread.join() + } catch { + case e: InterruptedException => + thread.interrupt() + throw e + } + + if (exception.isDefined) { + throw exception.get + } } // Assign remaining that are neither node-local nor rack-local --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org