This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d6a5f85 [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource d6a5f85 is described below commit d6a5f859848bbd237e19075dd26e1547fb3af417 Author: wuyi <ngone_5...@163.com> AuthorDate: Fri Dec 21 13:21:58 2018 -0600 [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource ## What changes were proposed in this pull request? As I mentioned in jira [SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to maxmize the use of cluster resource, this pr try to make `YarnAllocator` have the same blacklist behaviour with YARN. ## How was this patch tested? Added. Closes #23223 from Ngone51/dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN. Lead-authored-by: wuyi <ngone_5...@163.com> Co-authored-by: Ngone51 <ngone_5...@163.com> Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../apache/spark/deploy/yarn/YarnAllocator.scala | 32 +++++++-- .../yarn/YarnAllocatorBlacklistTracker.scala | 4 +- .../yarn/YarnAllocatorBlacklistTrackerSuite.scala | 2 +- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 75 +++++++++++++++++++++- 4 files changed, 101 insertions(+), 12 deletions(-) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 54b1ec2..a3feca5 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -607,13 +607,23 @@ private[yarn] class YarnAllocator( val message = "Container killed by YARN for exceeding physical memory limits. " + s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}." (true, message) - case _ => - // all the failures which not covered above, like: - // disk failure, kill by app master or resource manager, ... - allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt) - (true, "Container marked as failed: " + containerId + onHostStr + - ". Exit status: " + completedContainer.getExitStatus + - ". Diagnostics: " + completedContainer.getDiagnostics) + case other_exit_status => + // SPARK-26269: follow YARN's blacklisting behaviour(see https://github + // .com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had + // oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap + // ache/hadoop/yarn/util/Apps.java#L273 for details) + if (NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) { + (false, s"Container marked as failed: $containerId$onHostStr" + + s". Exit status: ${completedContainer.getExitStatus}" + + s". Diagnostics: ${completedContainer.getDiagnostics}.") + } else { + // completed container from a bad node + allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt) + (true, s"Container from a bad node: $containerId$onHostStr" + + s". Exit status: ${completedContainer.getExitStatus}" + + s". Diagnostics: ${completedContainer.getDiagnostics}.") + } + } if (exitCausedByApp) { @@ -739,4 +749,12 @@ private object YarnAllocator { val MEM_REGEX = "[0-9.]+ [KMG]B" val VMEM_EXCEEDED_EXIT_CODE = -103 val PMEM_EXCEEDED_EXIT_CODE = -104 + + val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set( + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + ContainerExitStatus.KILLED_BY_APPMASTER, + ContainerExitStatus.KILLED_AFTER_APP_COMPLETION, + ContainerExitStatus.ABORTED, + ContainerExitStatus.DISKS_FAILED + ) } diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala index ceac7cd..268976b 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala @@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker( if (removals.nonEmpty) { logInfo(s"removing nodes from YARN application master's blacklist: $removals") } - amClient.updateBlacklist(additions.asJava, removals.asJava) + if (additions.nonEmpty || removals.nonEmpty) { + amClient.updateBlacklist(additions.asJava, removals.asJava) + } currentBlacklistedYarnNodes = nodesToBlacklist } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala index aeac68e..2019107 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTrackerSuite.scala @@ -87,7 +87,7 @@ class YarnAllocatorBlacklistTrackerSuite extends SparkFunSuite with Matchers // expired blacklisted nodes (simulating a resource request) yarnBlacklistTracker.setSchedulerBlacklistedNodes(Set("host1", "host2")) // no change is communicated to YARN regarding the blacklisting - verify(amClientMock).updateBlacklist(Collections.emptyList(), Collections.emptyList()) + verify(amClientMock, times(0)).updateBlacklist(Collections.emptyList(), Collections.emptyList()) } test("combining scheduler and allocation blacklist") { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index b61e7df..53a538d 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.deploy.yarn +import java.util.Collections + import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration @@ -114,13 +116,29 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock) } - def createContainer(host: String, resource: Resource = containerResource): Container = { - val containerId = ContainerId.newContainerId(appAttemptId, containerNum) + def createContainer( + host: String, + containerNumber: Int = containerNum, + resource: Resource = containerResource): Container = { + val containerId: ContainerId = ContainerId.newContainerId(appAttemptId, containerNum) containerNum += 1 val nodeId = NodeId.newInstance(host, 1000) Container.newInstance(containerId, nodeId, "", resource, RM_REQUEST_PRIORITY, null) } + def createContainers(hosts: Seq[String], containerIds: Seq[Int]): Seq[Container] = { + hosts.zip(containerIds).map{case (host, id) => createContainer(host, id)} + } + + def createContainerStatus( + containerId: ContainerId, + exitStatus: Int, + containerState: ContainerState = ContainerState.COMPLETE, + diagnostics: String = "diagnostics"): ContainerStatus = { + ContainerStatus.newInstance(containerId, containerState, diagnostics, exitStatus) + } + + test("single container allocated") { // request a single container and receive it val handler = createAllocator(1) @@ -148,7 +166,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G")) handler.updateResourceRequests() - val container = createContainer("host1", handler.resource) + val container = createContainer("host1", resource = handler.resource) handler.handleAllocatedContainers(Array(container)) // get amount of memory and vcores from resource, so effectively skipping their validation @@ -417,4 +435,55 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter clock.advance(50 * 1000L) handler.getNumExecutorsFailed should be (0) } + + test("SPARK-26269: YarnAllocator should have same blacklist behaviour with YARN") { + val rmClientSpy = spy(rmClient) + val maxExecutors = 11 + + val handler = createAllocator( + maxExecutors, + rmClientSpy, + Map( + "spark.yarn.blacklist.executor.launch.blacklisting.enabled" -> "true", + "spark.blacklist.application.maxFailedExecutorsPerNode" -> "0")) + handler.updateResourceRequests() + + val hosts = (0 until maxExecutors).map(i => s"host$i") + val ids = 0 to maxExecutors + val containers = createContainers(hosts, ids) + + val nonBlacklistedStatuses = Seq( + ContainerExitStatus.SUCCESS, + ContainerExitStatus.PREEMPTED, + ContainerExitStatus.KILLED_EXCEEDED_VMEM, + ContainerExitStatus.KILLED_EXCEEDED_PMEM, + ContainerExitStatus.KILLED_BY_RESOURCEMANAGER, + ContainerExitStatus.KILLED_BY_APPMASTER, + ContainerExitStatus.KILLED_AFTER_APP_COMPLETION, + ContainerExitStatus.ABORTED, + ContainerExitStatus.DISKS_FAILED) + + val nonBlacklistedContainerStatuses = nonBlacklistedStatuses.zipWithIndex.map { + case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus) + } + + val BLACKLISTED_EXIT_CODE = 1 + val blacklistedStatuses = Seq(ContainerExitStatus.INVALID, BLACKLISTED_EXIT_CODE) + + val blacklistedContainerStatuses = blacklistedStatuses.zip(9 until maxExecutors).map { + case (exitStatus, idx) => createContainerStatus(containers(idx).getId, exitStatus) + } + + handler.handleAllocatedContainers(containers.slice(0, 9)) + handler.processCompletedContainers(nonBlacklistedContainerStatuses) + verify(rmClientSpy, never()) + .updateBlacklist(hosts.slice(0, 9).asJava, Collections.emptyList()) + + handler.handleAllocatedContainers(containers.slice(9, 11)) + handler.processCompletedContainers(blacklistedContainerStatuses) + verify(rmClientSpy) + .updateBlacklist(hosts.slice(9, 10).asJava, Collections.emptyList()) + verify(rmClientSpy) + .updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList()) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org