This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-2.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push: new b4202e7 [SPARK-26269][YARN][BRANCH-2.4] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource b4202e7 is described below commit b4202e79833f3adc00afe00f43e8d9165c9c8e48 Author: wuyi <ngone_5...@163.com> AuthorDate: Mon Jan 7 16:22:28 2019 -0600 [SPARK-26269][YARN][BRANCH-2.4] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource ## What changes were proposed in this pull request? As I mentioned in jira [SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to maxmize the use of cluster resource, this pr try to make `YarnAllocator` have the same blacklist behaviour with YARN. ## How was this patch tested? Added. Closes #23368 from Ngone51/dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN-branch-2.4. Lead-authored-by: wuyi <ngone_5...@163.com> Co-authored-by: Ngone51 <ngone_5...@163.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../apache/spark/deploy/yarn/YarnAllocator.scala | 31 ++++++-- .../yarn/YarnAllocatorBlacklistTracker.scala | 4 +- .../yarn/YarnAllocatorBlacklistTrackerSuite.scala | 2 +- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 83 ++++++++++++++++++++-- 4 files changed, 107 insertions(+), 13 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index f4dc80a..3357084 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -578,13 +578,23 @@ private[yarn] class YarnAllocator( (true, memLimitExceededLogMessage( completedContainer.getDiagnostics, PMEM_EXCEEDED_PATTERN)) - case _ => - // all the failures which not covered above, like: - // disk failure, kill by app master or resource manager, ... - allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt) - (true, "Container marked as failed: " + containerId + onHostStr + - ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) + case other_exit_status => + // SPARK-26269: follow YARN's blacklisting behaviour(see https://github + // .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had + // oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap + // ache/hadoop/yarn/util/Apps.java#L273 for details) + if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) { + (false, s"Container marked as failed: $containerId$onHostStr" + + s". Exit status: ${completedContainer.getExitStatus}" + + s". Diagnostics: ${completedContainer.getDiagnostics}.") + } else { + // completed container from a bad node + allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt) + (true, s"Container from a bad node: $containerId$onHostStr" + + s". Exit status: ${completedContainer.getExitStatus}" + + s". Diagnostics: ${completedContainer.getDiagnostics}.") + } + } if (exitCausedByApp) { @@ -722,4 +732,11 @@ private object YarnAllocator { "Consider boosting spark.yarn.executor.memoryOverhead or " + "disabling yarn.nodemanager.vmem-check-enabled because of YARN-4714." } + val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set( + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + ContainerExitStatus.KILLED_BY_APPMASTER, + ContainerExitStatus.KILLED_AFTER_APP_COMPLETION, + ContainerExitStatus.ABORTED, + ContainerExitStatus.DISKS_FAILED + ) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala index ceac7cd..268976b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala @@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker( if (removals.nonEmpty) { logInfo(s"removing nodes from YARN application master's blacklist: $removals") } - amClient.updateBlacklist(additions.asJava, removals.asJava) + if (additions.nonEmpty || removals.nonEmpty) { + amClient.updateBlacklist(additions.asJava, removals.asJava) + } currentBlacklistedYarnNodes = nodesToBlacklist } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala index aeac68e..2019107 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala @@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers // expired blacklisted nodes (simulating a resource request) yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2")) // no change is communicated to YARN regarding the blacklisting - verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList()) + verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), Collections.emptyList()) } test("combining scheduler and allocation blacklist") { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 3f783ba..2fb892e 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.yarn +import java.util.Collections + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -24,6 +26,7 @@ import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest import org.apache.hadoop.yarn.conf.YarnConfiguration +import org.mockito.ArgumentCaptor import org.mockito.Mockito._ import org.scalatest.{BeforeAndAfterEach, Matchers} @@ -86,7 +89,8 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter def createAllocator( maxExecutors: Int = 5, - rmClient: AMRMClient[ContainerRequest] = rmClient): YarnAllocator = { + rmClient: AMRMClient[ContainerRequest] = rmClient, + additionalConfigs: Map[String, String] = Map()): YarnAllocator = { val args = Array( "--jar", "somejar.jar", "--class", "SomeClass") @@ -95,6 +99,11 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter .set("spark.executor.instances", maxExecutors.toString) .set("spark.executor.cores", "5") .set("spark.executor.memory", "2048") + + for ((name, value) <- additionalConfigs) { + sparkConfClone.set(name, value) + } + new YarnAllocator( "not used", mock(classOf[RpcEndpointRef]), @@ -108,14 +117,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock) } - def createContainer(host: String): Container = { - // When YARN 2.6+ is required, avoid deprecation by using version with long second arg - val containerId = ContainerId.newInstance(appAttemptId, containerNum) + def createContainer( + host: String, + containerNumber: Int = containerNum, + resource: Resource = containerResource): Container = { + val containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum) containerNum += 1 val nodeId = NodeId.newInstance(host, 1000) Container.newInstance(containerId, nodeId, "", containerResource, RM_REQUEST_PRIORITY, null) } + def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = { + hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)} + } + + def createContainerStatus( + containerId: ContainerId, + exitStatus: Int, + containerState: ContainerState = ContainerState.COMPLETE, + diagnostics: String = "diagnostics"): ContainerStatus = { + ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus) + } + + test("single container allocated") { // request a single container and receive it val handler = createAllocator(1) @@ -400,4 +424,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock.advance(50 * 1000L) handler.getNumExecutorsFailed should be (0) } + + test("SPARK-26269: YarnAllocator should have same blacklist behaviour with YARN") { + val rmClientSpy = spy(rmClient) + val maxExecutors = 11 + + val handler = createAllocator( + maxExecutors, + rmClientSpy, + Map( + "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true", + "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0")) + handler.updateResourceRequests() + + val hosts = (0 until maxExecutors).map(i => s"host$i") + val ids = 0 to maxExecutors + val containers = createContainers(hosts, ids) + + val nonBlacklistedStatuses = Seq( + ContainerExitStatus.SUCCESS, + ContainerExitStatus.PREEMPTED, + ContainerExitStatus.KILLED_EXCEEDED_VMEM, + ContainerExitStatus.KILLED_EXCEEDED_PMEM, + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + ContainerExitStatus.KILLED_BY_APPMASTER, + ContainerExitStatus.KILLED_AFTER_APP_COMPLETION, + ContainerExitStatus.ABORTED, + ContainerExitStatus.DISKS_FAILED) + + val nonBlacklistedContainerStatuses = nonBlacklistedStatuses.zipWithIndex.map { + case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus) + } + + val BLACKLISTED_EXIT_CODE = 1 + val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, BLACKLISTED_EXIT_CODE) + + val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until maxExecutors).map { + case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus) + } + + handler.handleAllocatedContainers(containers.slice(0, 9)) + handler.processCompletedContainers(nonBlacklistedContainerStatuses) + verify(rmClientSpy, never()) + .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList()) + + handler.handleAllocatedContainers(containers.slice(9, 11)) + handler.processCompletedContainers(blacklistedContainerStatuses) + verify(rmClientSpy) + .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList()) + verify(rmClientSpy) + .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList()) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org