This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new e857b7ad1c7 [SPARK-39601][YARN] AllocationFailure should not be treated as exitCausedByApp when driver is shutting down e857b7ad1c7 is described below commit e857b7ad1c78c57d06436e387473d83e61293c7c Author: Cheng Pan <cheng...@apache.org> AuthorDate: Tue Dec 13 08:18:08 2022 -0600 [SPARK-39601][YARN] AllocationFailure should not be treated as exitCausedByApp when driver is shutting down ### What changes were proposed in this pull request? Treating container `AllocationFailure` as not "exitCausedByApp" when driver is shutting down. The approach is suggested at https://github.com/apache/spark/pull/36991#discussion_r915948343 ### Why are the changes needed? I observed some Spark Applications successfully completed all jobs but failed during the shutting down phase w/ reason: Max number of executor failures (16) reached, the timeline is Driver - Job success, Spark starts shutting down procedure. ``` 2022-06-23 19:50:55 CST AbstractConnector INFO - Stopped Spark74e9431b{HTTP/1.1, (http/1.1)}{0.0.0.0:0} 2022-06-23 19:50:55 CST SparkUI INFO - Stopped Spark web UI at http://hadoop2627.xxx.org:28446 2022-06-23 19:50:55 CST YarnClusterSchedulerBackend INFO - Shutting down all executors ``` Driver - A container allocate successful during shutting down phase. ``` 2022-06-23 19:52:21 CST YarnAllocator INFO - Launching container container_e94_1649986670278_7743380_02_000025 on host hadoop4388.xxx.org for executor with ID 24 for ResourceProfile Id 0 ``` Executor - The executor can not connect to driver endpoint because driver already stopped the endpoint. ``` Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911) at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393) at org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:81) at org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala) Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301) at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:413) at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.immutable.Range.foreach(Range.scala:158) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:411) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62) at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893) ... 4 more Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find endpoint: spark://CoarseGrainedSchedulerhadoop2627.xxx.org:21956 at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148) at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144) at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307) at scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64) at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99) at scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) ``` Driver - YarnAllocator received container launch error message and treat it as `exitCausedByApp` ``` 2022-06-23 19:52:27 CST YarnAllocator INFO - Completed container container_e94_1649986670278_7743380_02_000025 on host: hadoop4388.xxx.org (state: COMPLETE, exit status: 1) 2022-06-23 19:52:27 CST YarnAllocator WARN - Container from a bad node: container_e94_1649986670278_7743380_02_000025 on host: hadoop4388.xxx.org. Exit status: 1. Diagnostics: [2022-06-23 19:52:24.932]Exception from container-launch. Container id: container_e94_1649986670278_7743380_02_000025 Exit code: 1 Shell output: main : command provided 1 main : run as user is bdms_pm main : requested yarn user is bdms_pm Getting exit code file... Creating script paths... Writing pid file... Writing to tmp file /mnt/dfs/2/yarn/local/nmPrivate/application_1649986670278_7743380/container_e94_1649986670278_7743380_02_000025/container_e94_1649986670278_7743380_02_000025.pid.tmp Writing to cgroup task files... Creating local dirs... Launching container... Getting exit code file... Creating script paths... [2022-06-23 19:52:24.938]Container exited with a non-zero exit code 1. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err : Last 4096 bytes of stderr : at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288) at scala.concurrent.Promise.trySuccess(Promise.scala:94) at scala.concurrent.Promise.trySuccess$(Promise.scala:94) at scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:187) at org.apache.spark.rpc.netty.NettyRpcEnv.onSuccess$1(NettyRpcEnv.scala:225) at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7(NettyRpcEnv.scala:246) at org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7$adapted(NettyRpcEnv.scala:246) at org.apache.spark.rpc.netty.RpcOutboxMessage.onSuccess(Outbox.scala:90) at org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:195) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142) at org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53) at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ``` Driver - Eventually application failed because ”failed“ executor reached threshold ``` 2022-06-23 19:52:30 CST ApplicationMaster INFO - Final app status: FAILED, exitCode: 11, (reason: Max number of executor failures (16) reached) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Update UT. Closes #38622 from pan3793/SPARK-39601. Authored-by: Cheng Pan <cheng...@apache.org> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../spark/deploy/yarn/ApplicationMaster.scala | 1 + .../apache/spark/deploy/yarn/YarnAllocator.scala | 6 ++++++ .../cluster/YarnClientSchedulerBackend.scala | 2 +- .../cluster/YarnClusterSchedulerBackend.scala | 4 ++++ .../scheduler/cluster/YarnSchedulerBackend.scala | 2 +- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 22 ++++++++++++++++++++++ 6 files changed, 35 insertions(+), 2 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala index 69dd72720a5..9815fa6df8a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala @@ -822,6 +822,7 @@ private[spark] class ApplicationMaster( case Shutdown(code) => exitCode = code shutdown = true + allocator.setShutdown(true) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ee1d10c204a..4980d7e1841 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -199,6 +199,8 @@ private[yarn] class YarnAllocator( } } + @volatile private var shutdown = false + // The default profile is always present so we need to initialize the datastructures keyed by // ResourceProfile id to ensure its present if things start running before a request for // executors could add it. This approach is easier then going and special casing everywhere. @@ -215,6 +217,8 @@ private[yarn] class YarnAllocator( initDefaultProfile() + def setShutdown(shutdown: Boolean): Unit = this.shutdown = shutdown + def getNumExecutorsRunning: Int = synchronized { runningExecutorsPerResourceProfileId.values.map(_.size).sum } @@ -835,6 +839,8 @@ private[yarn] class YarnAllocator( // now I think its ok as none of the containers are expected to exit. val exitStatus = completedContainer.getExitStatus val (exitCausedByApp, containerExitReason) = exitStatus match { + case _ if shutdown => + (false, s"Executor for container $containerId exited after Application shutdown.") case ContainerExitStatus.SUCCESS => (false, s"Executor for container $containerId exited because of a YARN event (e.g., " + "preemption) and not because of an error in the running job.") diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala index 6e6d8406049..717c620f5c3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala @@ -162,7 +162,7 @@ private[spark] class YarnClientSchedulerBackend( */ override def stop(exitCode: Int): Unit = { assert(client != null, "Attempted to stop this scheduler before starting it!") - yarnSchedulerEndpoint.handleClientModeDriverStop(exitCode) + yarnSchedulerEndpoint.signalDriverStop(exitCode) if (monitorThread != null) { monitorThread.stopMonitor() } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala index e70a78d3c4c..5799df94ede 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala @@ -35,6 +35,10 @@ private[spark] class YarnClusterSchedulerBackend( startBindings() } + override def stop(exitCode: Int): Unit = { + yarnSchedulerEndpoint.signalDriverStop(exitCode) + } + override def getDriverLogUrls: Option[Map[String, String]] = { YarnContainerInfoHelper.getLogUrls(sc.hadoopConfiguration, container = None) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 572c16d9e9b..34848a7f3d8 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -319,7 +319,7 @@ private[spark] abstract class YarnSchedulerBackend( removeExecutorMessage.foreach { message => driverEndpoint.send(message) } } - private[cluster] def handleClientModeDriverStop(exitCode: Int): Unit = { + private[cluster] def signalDriverStop(exitCode: Int): Unit = { amEndpoint match { case Some(am) => am.send(Shutdown(exitCode)) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 5a80aa9c610..a5ca382fb46 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -693,6 +693,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList()) } + test("SPARK-39601 YarnAllocator should not count executor failure after shutdown") { + val (handler, _) = createAllocator() + handler.updateResourceRequests() + handler.getNumExecutorsFailed should be(0) + + val failedBeforeShutdown = createContainer("host1") + val failedAfterShutdown = createContainer("host2") + handler.handleAllocatedContainers(Seq(failedBeforeShutdown, failedAfterShutdown)) + + val failedBeforeShutdownStatus = ContainerStatus.newInstance( + failedBeforeShutdown.getId, ContainerState.COMPLETE, "Failed", -1) + val failedAfterShutdownStatus = ContainerStatus.newInstance( + failedAfterShutdown.getId, ContainerState.COMPLETE, "Failed", -1) + + handler.processCompletedContainers(Seq(failedBeforeShutdownStatus)) + handler.getNumExecutorsFailed should be(1) + + handler.setShutdown(true) + handler.processCompletedContainers(Seq(failedAfterShutdownStatus)) + handler.getNumExecutorsFailed should be(1) + } + test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " + "when offHeapEnabled is true.") { val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org