This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new e857b7ad1c7 [SPARK-39601][YARN] AllocationFailure should not be 
treated as exitCausedByApp when driver is shutting down
e857b7ad1c7 is described below

commit e857b7ad1c78c57d06436e387473d83e61293c7c
Author: Cheng Pan <cheng...@apache.org>
AuthorDate: Tue Dec 13 08:18:08 2022 -0600

    [SPARK-39601][YARN] AllocationFailure should not be treated as 
exitCausedByApp when driver is shutting down
    
    ### What changes were proposed in this pull request?
    
    Treating container `AllocationFailure` as not "exitCausedByApp" when driver 
is shutting down.
    
    The approach is suggested at 
https://github.com/apache/spark/pull/36991#discussion_r915948343
    
    ### Why are the changes needed?
    
    I observed some Spark Applications successfully completed all jobs but 
failed during the shutting down phase w/ reason: Max number of executor 
failures (16) reached, the timeline is
    
    Driver - Job success, Spark starts shutting down procedure.
    ```
    2022-06-23 19:50:55 CST AbstractConnector INFO - Stopped 
Spark74e9431b{HTTP/1.1, (http/1.1)}{0.0.0.0:0}
    2022-06-23 19:50:55 CST SparkUI INFO - Stopped Spark web UI at 
http://hadoop2627.xxx.org:28446
    2022-06-23 19:50:55 CST YarnClusterSchedulerBackend INFO - Shutting down 
all executors
    ```
    
    Driver - A container allocate successful during shutting down phase.
    ```
    2022-06-23 19:52:21 CST YarnAllocator INFO - Launching container 
container_e94_1649986670278_7743380_02_000025 on host hadoop4388.xxx.org for 
executor with ID 24 for ResourceProfile Id 0
    ```
    
    Executor - The executor can not connect to driver endpoint because driver 
already stopped the endpoint.
    ```
    Exception in thread "main" java.lang.reflect.UndeclaredThrowableException
      at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1911)
      at 
org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
      at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:393)
      at 
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend$.main(YarnCoarseGrainedExecutorBackend.scala:81)
      at 
org.apache.spark.executor.YarnCoarseGrainedExecutorBackend.main(YarnCoarseGrainedExecutorBackend.scala)
    Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult:
      at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
      at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
      at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
      at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:413)
      at 
scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
      at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
      at scala.collection.immutable.Range.foreach(Range.scala:158)
      at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
      at 
org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:411)
      at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
      at 
org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
      ... 4 more
    Caused by: org.apache.spark.rpc.RpcEndpointNotFoundException: Cannot find 
endpoint: spark://CoarseGrainedSchedulerhadoop2627.xxx.org:21956
      at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1(NettyRpcEnv.scala:148)
      at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$asyncSetupEndpointRefByURI$1$adapted(NettyRpcEnv.scala:144)
      at scala.concurrent.Future.$anonfun$flatMap$1(Future.scala:307)
      at 
scala.concurrent.impl.Promise.$anonfun$transformWith$1(Promise.scala:41)
      at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
      at org.apache.spark.util.ThreadUtils$$anon$1.execute(ThreadUtils.scala:99)
      at 
scala.concurrent.impl.ExecutionContextImpl$$anon$4.execute(ExecutionContextImpl.scala:138)
      at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
      at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
      at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
      at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
    ```
    
    Driver - YarnAllocator received container launch error message and treat it 
as `exitCausedByApp`
    ```
    2022-06-23 19:52:27 CST YarnAllocator INFO - Completed container 
container_e94_1649986670278_7743380_02_000025 on host: hadoop4388.xxx.org 
(state: COMPLETE, exit status: 1)
    2022-06-23 19:52:27 CST YarnAllocator WARN - Container from a bad node: 
container_e94_1649986670278_7743380_02_000025 on host: hadoop4388.xxx.org. Exit 
status: 1. Diagnostics: [2022-06-23 19:52:24.932]Exception from 
container-launch.
    Container id: container_e94_1649986670278_7743380_02_000025
    Exit code: 1
    Shell output: main : command provided 1
    main : run as user is bdms_pm
    main : requested yarn user is bdms_pm
    Getting exit code file...
    Creating script paths...
    Writing pid file...
    Writing to tmp file 
/mnt/dfs/2/yarn/local/nmPrivate/application_1649986670278_7743380/container_e94_1649986670278_7743380_02_000025/container_e94_1649986670278_7743380_02_000025.pid.tmp
    Writing to cgroup task files...
    Creating local dirs...
    Launching container...
    Getting exit code file...
    Creating script paths...
    
    [2022-06-23 19:52:24.938]Container exited with a non-zero exit code 1. 
Error file: prelaunch.err.
    Last 4096 bytes of prelaunch.err :
    Last 4096 bytes of stderr :
    at 
scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:873)
      at 
scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:72)
      at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:288)
      at 
scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:288)
      at 
scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:288)
      at scala.concurrent.Promise.trySuccess(Promise.scala:94)
      at scala.concurrent.Promise.trySuccess$(Promise.scala:94)
      at 
scala.concurrent.impl.Promise$DefaultPromise.trySuccess(Promise.scala:187)
      at 
org.apache.spark.rpc.netty.NettyRpcEnv.onSuccess$1(NettyRpcEnv.scala:225)
      at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7(NettyRpcEnv.scala:246)
      at 
org.apache.spark.rpc.netty.NettyRpcEnv.$anonfun$askAbortable$7$adapted(NettyRpcEnv.scala:246)
      at org.apache.spark.rpc.netty.RpcOutboxMessage.onSuccess(Outbox.scala:90)
      at 
org.apache.spark.network.client.TransportResponseHandler.handle(TransportResponseHandler.java:195)
      at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:142)
      at 
org.apache.spark.network.server.TransportChannelHandler.channelRead0(TransportChannelHandler.java:53)
      at 
io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
      at 
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
    ```
    
    Driver - Eventually application failed because ”failed“ executor reached 
threshold
    ```
    2022-06-23 19:52:30 CST ApplicationMaster INFO - Final app status: FAILED, 
exitCode: 11, (reason: Max number of executor failures (16) reached)
    ```
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Update UT.
    
    Closes #38622 from pan3793/SPARK-39601.
    
    Authored-by: Cheng Pan <cheng...@apache.org>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../spark/deploy/yarn/ApplicationMaster.scala      |  1 +
 .../apache/spark/deploy/yarn/YarnAllocator.scala   |  6 ++++++
 .../cluster/YarnClientSchedulerBackend.scala       |  2 +-
 .../cluster/YarnClusterSchedulerBackend.scala      |  4 ++++
 .../scheduler/cluster/YarnSchedulerBackend.scala   |  2 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 22 ++++++++++++++++++++++
 6 files changed, 35 insertions(+), 2 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index 69dd72720a5..9815fa6df8a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -822,6 +822,7 @@ private[spark] class ApplicationMaster(
       case Shutdown(code) =>
         exitCode = code
         shutdown = true
+        allocator.setShutdown(true)
     }
 
     override def receiveAndReply(context: RpcCallContext): 
PartialFunction[Any, Unit] = {
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ee1d10c204a..4980d7e1841 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -199,6 +199,8 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  @volatile private var shutdown = false
+
   // The default profile is always present so we need to initialize the 
datastructures keyed by
   // ResourceProfile id to ensure its present if things start running before a 
request for
   // executors could add it. This approach is easier then going and special 
casing everywhere.
@@ -215,6 +217,8 @@ private[yarn] class YarnAllocator(
 
   initDefaultProfile()
 
+  def setShutdown(shutdown: Boolean): Unit = this.shutdown = shutdown
+
   def getNumExecutorsRunning: Int = synchronized {
     runningExecutorsPerResourceProfileId.values.map(_.size).sum
   }
@@ -835,6 +839,8 @@ private[yarn] class YarnAllocator(
         // now I think its ok as none of the containers are expected to exit.
         val exitStatus = completedContainer.getExitStatus
         val (exitCausedByApp, containerExitReason) = exitStatus match {
+          case _ if shutdown =>
+            (false, s"Executor for container $containerId exited after 
Application shutdown.")
           case ContainerExitStatus.SUCCESS =>
             (false, s"Executor for container $containerId exited because of a 
YARN event (e.g., " +
               "preemption) and not because of an error in the running job.")
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
index 6e6d8406049..717c620f5c3 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala
@@ -162,7 +162,7 @@ private[spark] class YarnClientSchedulerBackend(
    */
   override def stop(exitCode: Int): Unit = {
     assert(client != null, "Attempted to stop this scheduler before starting 
it!")
-    yarnSchedulerEndpoint.handleClientModeDriverStop(exitCode)
+    yarnSchedulerEndpoint.signalDriverStop(exitCode)
     if (monitorThread != null) {
       monitorThread.stopMonitor()
     }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
index e70a78d3c4c..5799df94ede 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala
@@ -35,6 +35,10 @@ private[spark] class YarnClusterSchedulerBackend(
     startBindings()
   }
 
+  override def stop(exitCode: Int): Unit = {
+    yarnSchedulerEndpoint.signalDriverStop(exitCode)
+  }
+
   override def getDriverLogUrls: Option[Map[String, String]] = {
     YarnContainerInfoHelper.getLogUrls(sc.hadoopConfiguration, container = 
None)
   }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
index 572c16d9e9b..34848a7f3d8 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala
@@ -319,7 +319,7 @@ private[spark] abstract class YarnSchedulerBackend(
       removeExecutorMessage.foreach { message => driverEndpoint.send(message) }
     }
 
-    private[cluster] def handleClientModeDriverStop(exitCode: Int): Unit = {
+    private[cluster] def signalDriverStop(exitCode: Int): Unit = {
       amEndpoint match {
         case Some(am) =>
           am.send(Shutdown(exitCode))
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 5a80aa9c610..a5ca382fb46 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -693,6 +693,28 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
       .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
   }
 
+  test("SPARK-39601 YarnAllocator should not count executor failure after 
shutdown") {
+    val (handler, _) = createAllocator()
+    handler.updateResourceRequests()
+    handler.getNumExecutorsFailed should be(0)
+
+    val failedBeforeShutdown = createContainer("host1")
+    val failedAfterShutdown = createContainer("host2")
+    handler.handleAllocatedContainers(Seq(failedBeforeShutdown, 
failedAfterShutdown))
+
+    val failedBeforeShutdownStatus = ContainerStatus.newInstance(
+      failedBeforeShutdown.getId, ContainerState.COMPLETE, "Failed", -1)
+    val failedAfterShutdownStatus = ContainerStatus.newInstance(
+      failedAfterShutdown.getId, ContainerState.COMPLETE, "Failed", -1)
+
+    handler.processCompletedContainers(Seq(failedBeforeShutdownStatus))
+    handler.getNumExecutorsFailed should be(1)
+
+    handler.setShutdown(true)
+    handler.processCompletedContainers(Seq(failedAfterShutdownStatus))
+    handler.getNumExecutorsFailed should be(1)
+  }
+
   test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " 
+
     "when offHeapEnabled is true.") {
     val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to