This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 63d59956024 [SPARK-43510][YARN] Fix YarnAllocator internal state when adding running executor after processing completed containers 63d59956024 is described below commit 63d59956024781b062791dda9990a6043b6a10c1 Author: manuzhang <owenzhang1...@gmail.com> AuthorDate: Tue Jun 6 08:28:52 2023 -0500 [SPARK-43510][YARN] Fix YarnAllocator internal state when adding running executor after processing completed containers ### What changes were proposed in this pull request? Keep track of completed container ids in YarnAllocator and don't update internal state of a container if it's already completed. ### Why are the changes needed? YarnAllocator updates internal state adding running executors after executor launch in a separate thread. That can happen after the containers are already completed (e.g. preempted) and processed by YarnAllocator. Then YarnAllocator mistakenly thinks there are still running executors which are already lost. As a result, application hangs without any running executors. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added UT. Closes #41173 from manuzhang/spark-43510. Authored-by: manuzhang <owenzhang1...@gmail.com> Signed-off-by: Thomas Graves <tgra...@apache.org> (cherry picked from commit 89d44d092af4ae53fec296ca6569e240ad4c2bc5) Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../apache/spark/deploy/yarn/YarnAllocator.scala | 42 ++++++++++++++-------- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 28 ++++++++++++++- 2 files changed, 54 insertions(+), 16 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 313b19f919d..dede5501a39 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -90,6 +90,9 @@ private[yarn] class YarnAllocator( @GuardedBy("this") private val releasedContainers = collection.mutable.HashSet[ContainerId]() + @GuardedBy("this") + private val launchingExecutorContainerIds = collection.mutable.HashSet[ContainerId]() + @GuardedBy("this") private val runningExecutorsPerResourceProfileId = new HashMap[Int, mutable.Set[String]]() @@ -742,19 +745,6 @@ private[yarn] class YarnAllocator( logInfo(s"Launching container $containerId on host $executorHostname " + s"for executor with ID $executorId for ResourceProfile Id $rpId") - def updateInternalState(): Unit = synchronized { - getOrUpdateRunningExecutorForRPId(rpId).add(executorId) - getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() - executorIdToContainer(executorId) = container - containerIdToExecutorIdAndResourceProfileId(container.getId) = (executorId, rpId) - - val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) - val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname, - new HashSet[ContainerId]) - containerSet += containerId - allocatedContainerToHostMap.put(containerId, executorHostname) - } - val rp = rpIdToResourceProfile(rpId) val defaultResources = ResourceProfile.getDefaultProfileExecutorResources(sparkConf) val containerMem = rp.executorResources.get(ResourceProfile.MEMORY). @@ -767,6 +757,7 @@ private[yarn] class YarnAllocator( val rpRunningExecs = getOrUpdateRunningExecutorForRPId(rpId).size if (rpRunningExecs < getOrUpdateTargetNumExecutorsForRPId(rpId)) { getOrUpdateNumExecutorsStartingForRPId(rpId).incrementAndGet() + launchingExecutorContainerIds.add(containerId) if (launchContainers) { launcherPool.execute(() => { try { @@ -784,10 +775,11 @@ private[yarn] class YarnAllocator( localResources, rp.id ).run() - updateInternalState() + updateInternalState(rpId, executorId, container) } catch { case e: Throwable => getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() + launchingExecutorContainerIds.remove(containerId) if (NonFatal(e)) { logError(s"Failed to launch executor $executorId on container $containerId", e) // Assigned container should be released immediately @@ -800,7 +792,7 @@ private[yarn] class YarnAllocator( }) } else { // For test only - updateInternalState() + updateInternalState(rpId, executorId, container) } } else { logInfo(("Skip launching executorRunnable as running executors count: %d " + @@ -810,11 +802,31 @@ private[yarn] class YarnAllocator( } } + private def updateInternalState(rpId: Int, executorId: String, + container: Container): Unit = synchronized { + val containerId = container.getId + if (launchingExecutorContainerIds.contains(containerId)) { + getOrUpdateRunningExecutorForRPId(rpId).add(executorId) + executorIdToContainer(executorId) = container + containerIdToExecutorIdAndResourceProfileId(containerId) = (executorId, rpId) + + val localallocatedHostToContainersMap = getOrUpdateAllocatedHostToContainersMapForRPId(rpId) + val executorHostname = container.getNodeId.getHost + val containerSet = localallocatedHostToContainersMap.getOrElseUpdate(executorHostname, + new HashSet[ContainerId]) + containerSet += containerId + allocatedContainerToHostMap.put(containerId, executorHostname) + launchingExecutorContainerIds.remove(containerId) + } + getOrUpdateNumExecutorsStartingForRPId(rpId).decrementAndGet() + } + // Visible for testing. private[yarn] def processCompletedContainers( completedContainers: Seq[ContainerStatus]): Unit = synchronized { for (completedContainer <- completedContainers) { val containerId = completedContainer.getContainerId + launchingExecutorContainerIds.remove(containerId) val (_, rpId) = containerIdToExecutorIdAndResourceProfileId.getOrElse(containerId, ("", DEFAULT_RESOURCE_PROFILE_ID)) val alreadyReleased = releasedContainers.remove(containerId) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index a5ca382fb46..bc249db9337 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.deploy.yarn import java.util import java.util.Collections +import java.util.concurrent.atomic.AtomicInteger import scala.collection.JavaConverters._ import scala.collection.mutable @@ -33,6 +34,7 @@ import org.mockito.ArgumentCaptor import org.mockito.Mockito._ import org.mockito.invocation.InvocationOnMock import org.mockito.stubbing.Answer +import org.scalatest.PrivateMethodTester import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ @@ -60,7 +62,7 @@ class MockResolver extends SparkRackResolver(SparkHadoopUtil.get.conf) { } -class YarnAllocatorSuite extends SparkFunSuite with Matchers { +class YarnAllocatorSuite extends SparkFunSuite with Matchers with PrivateMethodTester { val conf = new YarnConfiguration() val sparkConf = new SparkConf() sparkConf.set(DRIVER_HOST_ADDRESS, "localhost") @@ -838,4 +840,28 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers { verify(rpcEndPoint, times(1)). send(DecommissionExecutorsOnHost(org.mockito.ArgumentMatchers.any())) } + + test("SPARK-43510: Running executors should be none when YarnAllocator adds running executors " + + "after processing completed containers") { + val (handler, _) = createAllocator(1) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be(0) + handler.getNumContainersPendingAllocate should be(1) + + val container = createContainer("host1") + handler.handleAllocatedContainers(Array(container)) + handler.getNumExecutorsRunning should be(1) + handler.getNumContainersPendingAllocate should be(0) + + val status = ContainerStatus.newInstance( + container.getId, ContainerState.COMPLETE, "Finished", 0) + val getOrUpdateNumExecutorsStartingForRPId = PrivateMethod[AtomicInteger]( + Symbol("getOrUpdateNumExecutorsStartingForRPId")) + handler.invokePrivate(getOrUpdateNumExecutorsStartingForRPId(0)).incrementAndGet() + handler.processCompletedContainers(Seq(status)) + val updateInternalState = PrivateMethod[Unit](Symbol("updateInternalState")) + handler.invokePrivate(updateInternalState(0, "1", container)) + handler.getNumExecutorsRunning should be(0) + handler.getNumExecutorsStarting should be(0) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org