This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89d44d092af [SPARK-43510][YARN] Fix YarnAllocator internal state when 
adding running executor after processing completed containers
89d44d092af is described below

commit 89d44d092af4ae53fec296ca6569e240ad4c2bc5
Author: manuzhang <owenzhang1...@gmail.com>
AuthorDate: Tue Jun 6 08:28:52 2023 -0500

    [SPARK-43510][YARN] Fix YarnAllocator internal state when adding running 
executor after processing completed containers
    
    ### What changes were proposed in this pull request?
    Keep track of completed container ids in YarnAllocator and don't update 
internal state of a container if it's already completed.
    
    ### Why are the changes needed?
    YarnAllocator updates internal state adding running executors after 
executor launch in a separate thread. That can happen after the containers are 
already completed (e.g. preempted) and processed by YarnAllocator. Then 
YarnAllocator mistakenly thinks there are still running executors which are 
already lost. As a result, application hangs without any running executors.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Added UT.
    
    Closes #41173 from manuzhang/spark-43510.
    
    Authored-by: manuzhang <owenzhang1...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 42 ++++++++++++++--------
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 28 ++++++++++++++-
 2 files changed, 54 insertions(+), 16 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index b6ee21ed817..19c06f95731 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -91,6 +91,9 @@ private[yarn] class YarnAllocator(
   @GuardedBy("this")
   private val releasedContainers = collection.mutable.HashSet[ContainerId]()
 
+  @GuardedBy("this")
+  private val launchingExecutorContainerIds = 
collection.mutable.HashSet[ContainerId]()
+
   @GuardedBy("this")
   private val runningExecutorsPerResourceProfileId = new HashMap[Int, 
mutable.Set[String]]()
 
@@ -738,19 +741,6 @@ private[yarn] class YarnAllocator(
       logInfo(s"Launching container $containerId on host $executorHostname " +
         s"for executor with ID $executorId for ResourceProfile Id $rpId")
 
-      def updateInternalState(): Unit = synchronized {
-        getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
-        getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
-        executorIdToContainer(executorId) = container
-        containerIdToExecutorIdAndResourceProfileId(container.getId) = 
(executorId, rpId)
-
-        val localallocatedHostToContainersMap = 
getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
-        val containerSet = 
localallocatedHostToContainersMap.getOrElseUpdate(executorHostname,
-          new HashSet[ContainerId])
-        containerSet += containerId
-        allocatedContainerToHostMap.put(containerId, executorHostname)
-      }
-
       val rp = rpIdToResourceProfile(rpId)
       val defaultResources = 
ResourceProfile.getDefaultProfileExecutorResources(sparkConf)
       val containerMem = rp.executorResources.get(ResourceProfile.MEMORY).
@@ -763,6 +753,7 @@ private[yarn] class YarnAllocator(
       val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size
       if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) {
         getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet()
+        launchingExecutorContainerIds.add(containerId)
         if (launchContainers) {
           launcherPool.execute(() => {
             try {
@@ -780,10 +771,11 @@ private[yarn] class YarnAllocator(
                 localResources,
                 rp.id
               ).run()
-              updateInternalState()
+              updateInternalState(rpId, executorId, container)
             } catch {
               case e: Throwable =>
                 getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
+                launchingExecutorContainerIds.remove(containerId)
                 if (NonFatal(e)) {
                   logError(s"Failed to launch executor $executorId on 
container $containerId", e)
                   // Assigned container should be released immediately
@@ -796,7 +788,7 @@ private[yarn] class YarnAllocator(
           })
         } else {
           // For test only
-          updateInternalState()
+          updateInternalState(rpId, executorId, container)
         }
       } else {
         logInfo(("Skip launching executorRunnable as running executors count: 
%d " +
@@ -806,11 +798,31 @@ private[yarn] class YarnAllocator(
     }
   }
 
+  private def updateInternalState(rpId: Int, executorId: String,
+      container: Container): Unit = synchronized {
+    val containerId = container.getId
+    if (launchingExecutorContainerIds.contains(containerId)) {
+      getOrUpdateRunningExecutorForRPId(rpId).add(executorId)
+      executorIdToContainer(executorId) = container
+      containerIdToExecutorIdAndResourceProfileId(containerId) = (executorId, 
rpId)
+
+      val localallocatedHostToContainersMap = 
getOrUpdateAllocatedHostToContainersMapForRPId(rpId)
+      val executorHostname = container.getNodeId.getHost
+      val containerSet = 
localallocatedHostToContainersMap.getOrElseUpdate(executorHostname,
+        new HashSet[ContainerId])
+      containerSet += containerId
+      allocatedContainerToHostMap.put(containerId, executorHostname)
+      launchingExecutorContainerIds.remove(containerId)
+    }
+    getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet()
+  }
+
   // Visible for testing.
   private[yarn] def processCompletedContainers(
       completedContainers: Seq[ContainerStatus]): Unit = synchronized {
     for (completedContainer <- completedContainers) {
       val containerId = completedContainer.getContainerId
+      launchingExecutorContainerIds.remove(containerId)
       val (_, rpId) = 
containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId,
         ("", DEFAULT_RESOURCE_PROFILE_ID))
       val alreadyReleased = releasedContainers.remove(containerId)
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index ed591fd9e36..055edfbf767 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn
 
 import java.util
 import java.util.Collections
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
 import scala.collection.mutable
@@ -33,6 +34,7 @@ import org.mockito.ArgumentCaptor
 import org.mockito.Mockito._
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
+import org.scalatest.PrivateMethodTester
 import org.scalatest.matchers.must.Matchers
 import org.scalatest.matchers.should.Matchers._
 
@@ -60,7 +62,7 @@ class MockResolver extends 
SparkRackResolver(SparkHadoopUtil.get.conf) {
 
 }
 
-class YarnAllocatorSuite extends SparkFunSuite with Matchers {
+class YarnAllocatorSuite extends SparkFunSuite with Matchers with 
PrivateMethodTester {
   val conf = new YarnConfiguration()
   val sparkConf = new SparkConf()
   sparkConf.set(DRIVER_HOST_ADDRESS, "localhost")
@@ -832,4 +834,28 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers {
     verify(rpcEndPoint, times(1)).
       send(DecommissionExecutorsOnHost(org.mockito.ArgumentMatchers.any()))
   }
+
+  test("SPARK-43510: Running executors should be none when YarnAllocator adds 
running executors " +
+    "after processing completed containers") {
+    val (handler, _) = createAllocator(1)
+    handler.updateResourceRequests()
+    handler.getNumExecutorsRunning should be(0)
+    handler.getNumContainersPendingAllocate should be(1)
+
+    val container = createContainer("host1")
+    handler.handleAllocatedContainers(Array(container))
+    handler.getNumExecutorsRunning should be(1)
+    handler.getNumContainersPendingAllocate should be(0)
+
+    val status = ContainerStatus.newInstance(
+      container.getId, ContainerState.COMPLETE, "Finished", 0)
+    val getOrUpdateNumExecutorsStartingForRPId = PrivateMethod[AtomicInteger](
+      Symbol("getOrUpdateNumExecutorsStartingForRPId"))
+    
handler.invokePrivate(getOrUpdateNumExecutorsStartingForRPId(0)).incrementAndGet()
+    handler.processCompletedContainers(Seq(status))
+    val updateInternalState = 
PrivateMethod[Unit](Symbol("updateInternalState"))
+    handler.invokePrivate(updateInternalState(0, "1", container))
+    handler.getNumExecutorsRunning should be(0)
+    handler.getNumExecutorsStarting should be(0)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to