This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 5cc990f5ba7 [SPARK-44091][YARN][TESTS] Introduce `withResourceTypes` to `ResourceRequestTestHelper` to restore `resourceTypes` as default value after testing 5cc990f5ba7 is described below commit 5cc990f5ba716edcb6e0e05075e28a1ce604fd22 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Aug 27 21:35:13 2023 -0700 [SPARK-44091][YARN][TESTS] Introduce `withResourceTypes` to `ResourceRequestTestHelper` to restore `resourceTypes` as default value after testing ### What changes were proposed in this pull request? This pr convert `ResourceRequestTestHelper` from `object` to `trait` and introduce a new function named `withResourceTypes` to `ResourceRequestTestHelper` to restore `resourceTypes` as default value after testing. ### Why are the changes needed? When test yarn module with command `build/sbt "yarn/test" -Pyarn -Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest`, there will some test failed in `YarnClusterSuite` like ``` [info] YarnClusterSuite: [info] - run Spark in yarn-client mode *** FAILED *** (3 seconds, 125 milliseconds) [info] FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:238) [info] org.scalatest.exceptions.TestFailedException: [info] at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472) [info] at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471) [info] at org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231) [info] at org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295) [info] at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.checkResult(BaseYarnClusterSuite.scala:238) [info] at org.apache.spark.deploy.yarn.YarnClusterSuite.testBasicYarnApp(YarnClusterSuite.scala:350) [info] at org.apache.spark.deploy.yarn.YarnClusterSuite.$anonfun$new$1(YarnClusterSuite.scala:95) [info] at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) [info] at org.apache.spark.deploy.yarn.BaseYarnClusterSuite.$anonfun$test$1(BaseYarnClusterSuite.scala:77) [info] at org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127) [info] at org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282) [info] at org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231) [info] at org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230) [info] at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69) [info] at org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155) [info] at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85) [info] at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83) [info] at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) [info] at org.scalatest.Transformer.apply(Transformer.scala:22) [info] at org.scalatest.Transformer.apply(Transformer.scala:20) ... ``` and the following error logs will in `unit-tests.log`: ``` 23/06/20 16:56:38.056 IPC Server handler 10 on default port 49553 WARN Server: IPC Server handler 10 on default port 49553, call Call#3 Retry#0 org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport from 127.0.0.1:49561 org.apache.hadoop.yarn.exceptions.ResourceNotFoundException: The resource manager encountered a problem that should not occur under normal circumstances. Please report this error to the Hadoop community by opening a JIRA ticket at http://issues.apache.org/jira and including the following information: * Resource type requested: custom-resource-type-1 * Resource object: <memory:-1, vCores:-1> * The stack trace for this exception: java.lang.Exception at org.apache.hadoop.yarn.exceptions.ResourceNotFoundException.<init>(ResourceNotFoundException.java:47) at org.apache.hadoop.yarn.api.records.Resource.getResourceInformation(Resource.java:264) at org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.setResourceInformation(ResourcePBImpl.java:213) at org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.getProto(ResourcePBImpl.java:61) at org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils.convertToProtoFormat(ProtoUtils.java:463) at org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.convertToProtoFormat(ApplicationResourceUsageReportPBImpl.java:289) at org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.mergeLocalToBuilder(ApplicationResourceUsageReportPBImpl.java:91) at org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.mergeLocalToProto(ApplicationResourceUsageReportPBImpl.java:122) at org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.getProto(ApplicationResourceUsageReportPBImpl.java:63) at org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils.convertToProtoFormat(ProtoUtils.java:247) at org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl.convertToProtoFormat(ApplicationReportPBImpl.java:560) at org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl.setApplicationResourceUsageReport(ApplicationReportPBImpl.java:100) at org.apache.hadoop.yarn.server.utils.BuilderUtils.newApplicationReport(BuilderUtils.java:406) at org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.createAndGetApplicationReport(RMAppImpl.java:779) at org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:429) at org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:247) at org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:615) at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621) at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589) at org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1213) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1089) at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1012) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3026) ... ``` then mvn test will also have similar failure results ``` build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest ``` ``` YarnClusterSuite: 2023-06-20 09:47:45.531:INFO::ScalaTest-main-running-DiscoverySuite: Logging initialized 7792ms to org.eclipse.jetty.util.log.StdErrLog - run Spark in yarn-client mode *** FAILED *** FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:238) - run Spark in yarn-cluster mode *** FAILED *** FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:238) - run Spark in yarn-client mode with unmanaged am - run Spark in yarn-client mode with different configurations, ensuring redaction - run Spark in yarn-cluster mode with different configurations, ensuring redaction *** FAILED *** FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:238) - yarn-cluster should respect conf overrides in SparkHadoopUtil (SPARK-16414, SPARK-23630) *** FAILED *** FAILED did not equal FINISHED (stdout/stderr was not captured) (BaseYarnClusterSuite.scala:238) ``` The call to `ResourceUtils.reinitializeResources` will fill private static variable contents of `ResourceUtils`, like: ``` private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX = new ConcurrentHashMap<String, Integer>(); private static volatile Map<String, ResourceInformation> resourceTypes; private static volatile Map<String, ResourceInformation> nonCountableResourceTypes; private static volatile ResourceInformation[] resourceTypesArray; private static volatile Map<String, ResourceInformation> readOnlyNodeResources; ``` and these static variable will not be cleaned up automatically, this may cause different test cases to misuse these shared variables and unexpectedly fail in certain specific scenarios, so this pr use the new function `withResourceTypes` to restore `resourceTypes` as default value after testing. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual checked ``` build/sbt "yarn/test" -Pyarn -Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest ``` and ``` build/mvn clean install -pl resource-managers/yarn -Pyarn -Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest ``` can run successfully after this pr Closes #41673 from LuciferYang/SPARK-44091. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> (cherry picked from commit de96a86d9dc3b32d87deb5a49a4a2d0f6add98a0) Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/deploy/yarn/ClientSuite.scala | 172 +++++++------- .../deploy/yarn/ResourceRequestHelperSuite.scala | 37 +-- .../deploy/yarn/ResourceRequestTestHelper.scala | 13 +- .../spark/deploy/yarn/YarnAllocatorSuite.scala | 252 +++++++++++---------- 4 files changed, 250 insertions(+), 224 deletions(-) diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala index ba116c27716..b7fb409ebc3 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala @@ -55,7 +55,9 @@ import org.apache.spark.resource.ResourceID import org.apache.spark.resource.ResourceUtils.AMOUNT import org.apache.spark.util.{SparkConfWithEnv, Utils} -class ClientSuite extends SparkFunSuite with Matchers { +class ClientSuite extends SparkFunSuite + with Matchers + with ResourceRequestTestHelper { private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, Seq.empty: _*) import Client._ @@ -473,24 +475,24 @@ class ClientSuite extends SparkFunSuite with Matchers { ).foreach { case (deployMode, prefix) => test(s"custom resource request ($deployMode mode)") { val resources = Map("fpga" -> 2, "gpu" -> 3) - ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) + withResourceTypes(resources.keys.toSeq) { + val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode) + resources.foreach { case (name, v) => + conf.set(s"${prefix}${name}.${AMOUNT}", v.toString) + } - val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode) - resources.foreach { case (name, v) => - conf.set(s"${prefix}${name}.${AMOUNT}", v.toString) - } + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) - val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) - val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + val client = new Client(new ClientArguments(Array()), conf, null) + client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) - val client = new Client(new ClientArguments(Array()), conf, null) - client.createApplicationSubmissionContext( - new YarnClientApplication(getNewApplicationResponse, appContext), - containerLaunchContext) - - resources.foreach { case (name, value) => - appContext.getResource.getResourceInformation(name).getValue should be (value) + resources.foreach { case (name, value) => + appContext.getResource.getResourceInformation(name).getValue should be (value) + } } } } @@ -498,43 +500,45 @@ class ClientSuite extends SparkFunSuite with Matchers { test("custom driver resource request yarn config and spark config fails") { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", conf.get(YARN_FPGA_DEVICE) -> "fpga") - ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) - resources.keys.foreach { yarnName => - conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2") - } - resources.values.foreach { rName => - conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") - } + withResourceTypes(resources.keys.toSeq) { + resources.keys.foreach { yarnName => + conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2") + } + resources.values.foreach { rName => + conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") + } - val error = intercept[SparkException] { - ResourceRequestHelper.validateResources(conf) - }.getMessage() + val error = intercept[SparkException] { + ResourceRequestHelper.validateResources(conf) + }.getMessage() - assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga.amount," + - " please use spark.driver.resource.fpga.amount")) - assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu.amount," + - " please use spark.driver.resource.gpu.amount")) + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga.amount," + + " please use spark.driver.resource.fpga.amount")) + assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu.amount," + + " please use spark.driver.resource.gpu.amount")) + } } test("custom executor resource request yarn config and spark config fails") { val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster") val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", conf.get(YARN_FPGA_DEVICE) -> "fpga") - ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) - resources.keys.foreach { yarnName => - conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2") - } - resources.values.foreach { rName => - conf.set(new ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3") - } + withResourceTypes(resources.keys.toSeq) { + resources.keys.foreach { yarnName => + conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2") + } + resources.values.foreach { rName => + conf.set(new ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3") + } - val error = intercept[SparkException] { - ResourceRequestHelper.validateResources(conf) - }.getMessage() + val error = intercept[SparkException] { + ResourceRequestHelper.validateResources(conf) + }.getMessage() - assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga.amount," + - " please use spark.executor.resource.fpga.amount")) - assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu.amount," + - " please use spark.executor.resource.gpu.amount")) + assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga.amount," + + " please use spark.executor.resource.fpga.amount")) + assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu.amount," + + " please use spark.executor.resource.gpu.amount")) + } } @@ -545,30 +549,30 @@ class ClientSuite extends SparkFunSuite with Matchers { conf.get(YARN_FPGA_DEVICE) -> "fpga", yarnMadeupResource -> "madeup") - ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) - - resources.values.foreach { rName => - conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") - } - // also just set yarn one that we don't convert - conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}", "5") - val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) - val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) - val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) + withResourceTypes(resources.keys.toSeq) { + resources.values.foreach { rName => + conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") + } + // also just set yarn one that we don't convert + conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}", "5") + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - val client = new Client(new ClientArguments(Array()), conf, null) - val newContext = client.createApplicationSubmissionContext( - new YarnClientApplication(getNewApplicationResponse, appContext), - containerLaunchContext) + val client = new Client(new ClientArguments(Array()), conf, null) + val newContext = client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) - val yarnRInfo = newContext.getResource.getResources - val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap - assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty) - assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3) - assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty) - assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).get === 3) - assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) - assert(allResourceInfo.get(yarnMadeupResource).get === 5) + val yarnRInfo = newContext.getResource.getResources + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap + assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty) + assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3) + assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty) + assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).get === 3) + assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) + assert(allResourceInfo.get(yarnMadeupResource).get === 5) + } } test("gpu/fpga spark resources mapped to custom yarn resources") { @@ -579,26 +583,26 @@ class ClientSuite extends SparkFunSuite with Matchers { conf.set(YARN_FPGA_DEVICE.key, fpgaCustomName) val resources = Map(gpuCustomName -> "gpu", fpgaCustomName -> "fpga") + withResourceTypes(resources.keys.toSeq) { + resources.values.foreach { rName => + conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") + } + val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) + val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) + val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq) - resources.values.foreach { rName => - conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3") - } - val appContext = Records.newRecord(classOf[ApplicationSubmissionContext]) - val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse]) - val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext]) - - val client = new Client(new ClientArguments(Array()), conf, null) - val newContext = client.createApplicationSubmissionContext( - new YarnClientApplication(getNewApplicationResponse, appContext), - containerLaunchContext) + val client = new Client(new ClientArguments(Array()), conf, null) + val newContext = client.createApplicationSubmissionContext( + new YarnClientApplication(getNewApplicationResponse, appContext), + containerLaunchContext) - val yarnRInfo = newContext.getResource.getResources - val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap - assert(allResourceInfo.get(gpuCustomName).nonEmpty) - assert(allResourceInfo.get(gpuCustomName).get === 3) - assert(allResourceInfo.get(fpgaCustomName).nonEmpty) - assert(allResourceInfo.get(fpgaCustomName).get === 3) + val yarnRInfo = newContext.getResource.getResources + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap + assert(allResourceInfo.get(gpuCustomName).nonEmpty) + assert(allResourceInfo.get(gpuCustomName).get === 3) + assert(allResourceInfo.get(fpgaCustomName).nonEmpty) + assert(allResourceInfo.get(fpgaCustomName).get === 3) + } } test("test yarn jars path not exists") { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala index 53b6d192e72..56ec5a801e4 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala @@ -28,7 +28,9 @@ import org.apache.spark.deploy.yarn.config._ import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY} import org.apache.spark.resource.ResourceUtils.AMOUNT -class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { +class ResourceRequestHelperSuite extends SparkFunSuite + with Matchers + with ResourceRequestTestHelper { private val CUSTOM_RES_1 = "custom-resource-type-1" private val CUSTOM_RES_2 = "custom-resource-type-2" @@ -104,17 +106,16 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { val requests = resources.map { case (rName, rValue, rUnit) => (rName, rValue.toString + rUnit) }.toMap - - ResourceRequestTestHelper.initializeResourceTypes(resourceDefs) - - val resource = createResource() - setResourceRequests(requests, resource) - - resources.foreach { case (rName, rValue, rUnit) => - val requested = resource.getResourceInformation(rName) - assert(requested.getName === rName) - assert(requested.getValue === rValue) - assert(requested.getUnits === rUnit) + withResourceTypes(resourceDefs) { + val resource = createResource() + setResourceRequests(requests, resource) + + resources.foreach { case (rName, rValue, rUnit) => + val requested = resource.getResourceInformation(rName) + assert(requested.getName === rName) + assert(requested.getValue === rValue) + assert(requested.getUnits === rUnit) + } } } } @@ -125,13 +126,13 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers { ("invalid unit", CUSTOM_RES_1, "123ppp") ).foreach { case (name, key, value) => test(s"invalid request: $name") { - ResourceRequestTestHelper.initializeResourceTypes(Seq(key)) - - val resource = createResource() - val thrown = intercept[IllegalArgumentException] { - setResourceRequests(Map(key -> value), resource) + withResourceTypes(Seq(key)) { + val resource = createResource() + val thrown = intercept[IllegalArgumentException] { + setResourceRequests(Map(key -> value), resource) + } + thrown.getMessage should include (key) } - thrown.getMessage should include (key) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala index 19c842f0928..826bded50f8 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala @@ -22,7 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.hadoop.yarn.api.records.ResourceTypeInfo import org.apache.hadoop.yarn.util.resource.ResourceUtils -object ResourceRequestTestHelper { +trait ResourceRequestTestHelper { def initializeResourceTypes(resourceTypes: Seq[String]): Unit = { // ResourceUtils.reinitializeResources() is the YARN-way // to specify resources for the execution of the tests. @@ -38,4 +38,15 @@ object ResourceRequestTestHelper { ResourceUtils.reinitializeResources(allResourceTypes.asJava) } + + /** + * `initializeResourceTypes` with inputs, call `f` and + * restore `resourceTypes`` as default value. + */ + def withResourceTypes(resourceTypes: Seq[String])(f: => Unit): Unit = { + initializeResourceTypes(resourceTypes) + try f finally { + initializeResourceTypes(Seq.empty) + } + } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala index 055edfbf767..f6f2e1b11d5 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala @@ -62,7 +62,10 @@ class MockResolver extends SparkRackResolver(SparkHadoopUtil.get.conf) { } -class YarnAllocatorSuite extends SparkFunSuite with Matchers with PrivateMethodTester { +class YarnAllocatorSuite extends SparkFunSuite + with Matchers + with PrivateMethodTester + with ResourceRequestTestHelper { val conf = new YarnConfiguration() val sparkConf = new SparkConf() sparkConf.set(DRIVER_HOST_ADDRESS, "localhost") @@ -191,135 +194,141 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with PrivateMethodT test("single container allocated with ResourceProfile") { val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE)) - ResourceRequestTestHelper.initializeResourceTypes(yarnResources) - // create default profile so we get a different id to test below - val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf) - val execReq = new ExecutorResourceRequests().resource("gpu", 6) - val taskReq = new TaskResourceRequests().resource("gpu", 1) - val rprof = new ResourceProfile(execReq.requests, taskReq.requests) - // request a single container and receive it - val (handler, _) = createAllocator(0) + withResourceTypes(yarnResources) { + // create default profile so we get a different id to test below + val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val execReq = new ExecutorResourceRequests().resource("gpu", 6) + val taskReq = new TaskResourceRequests().resource("gpu", 1) + val rprof = new ResourceProfile(execReq.requests, taskReq.requests) + // request a single container and receive it + val (handler, _) = createAllocator(0) + + val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) - val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1) - val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0) - handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, - numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumContainersPendingAllocate should be (1) - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getNumContainersPendingAllocate should be (1) + val container = createContainer("host1", priority = Priority.newInstance(rprof.id)) + handler.handleAllocatedContainers(Array(container)) - val container = createContainer("host1", priority = Priority.newInstance(rprof.id)) - handler.handleAllocatedContainers(Array(container)) + handler.getNumExecutorsRunning should be (1) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id) + hostTocontainer.get("host1").get should contain(container.getId) - handler.getNumExecutorsRunning should be (1) - handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id) - hostTocontainer.get("host1").get should contain(container.getId) - - val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size - size should be (0) + val size = + rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size + size should be (0) - ResourceProfile.reInitDefaultProfile(sparkConf) + ResourceProfile.reInitDefaultProfile(sparkConf) + } } test("multiple containers allocated with ResourceProfiles") { val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), sparkConf.get(YARN_FPGA_DEVICE)) - ResourceRequestTestHelper.initializeResourceTypes(yarnResources) - // create default profile so we get a different id to test below - val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf) - val execReq = new ExecutorResourceRequests().resource("gpu", 6) - val taskReq = new TaskResourceRequests().resource("gpu", 1) - val rprof = new ResourceProfile(execReq.requests, taskReq.requests) + withResourceTypes(yarnResources) { + // create default profile so we get a different id to test below + val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf) + val execReq = new ExecutorResourceRequests().resource("gpu", 6) + val taskReq = new TaskResourceRequests().resource("gpu", 1) + val rprof = new ResourceProfile(execReq.requests, taskReq.requests) - val execReq2 = new ExecutorResourceRequests().memory("8g").resource("fpga", 2) - val taskReq2 = new TaskResourceRequests().resource("fpga", 1) - val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests) + val execReq2 = new ExecutorResourceRequests().memory("8g").resource("fpga", 2) + val taskReq2 = new TaskResourceRequests().resource("fpga", 1) + val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests) - // request a single container and receive it - val (handler, _) = createAllocator(1) - val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1, rprof2 -> 2) - val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0, rprof2.id -> 0) - handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, - numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) - - handler.updateResourceRequests() - handler.getNumExecutorsRunning should be (0) - handler.getNumContainersPendingAllocate should be (3) - - val containerResourcerp2 = Resource.newInstance(10240, 5) - - val container = createContainer("host1", priority = Priority.newInstance(rprof.id)) - val container2 = createContainer("host2", resource = containerResourcerp2, - priority = Priority.newInstance(rprof2.id)) - val container3 = createContainer("host3", resource = containerResourcerp2, - priority = Priority.newInstance(rprof2.id)) - handler.handleAllocatedContainers(Array(container, container2, container3)) + // request a single container and receive it + val (handler, _) = createAllocator(1) + val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof -> 1, rprof2 -> 2) + val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id -> 0, rprof2.id -> 0) + handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap, + numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty) - handler.getNumExecutorsRunning should be (3) - handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") - handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2") - handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host3") - - val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id) - hostTocontainer.get("host1").get should contain(container.getId) - val hostTocontainer2 = handler.allocatedHostToContainersMapPerRPId(rprof2.id) - hostTocontainer2.get("host2").get should contain(container2.getId) - hostTocontainer2.get("host3").get should contain(container3.getId) - - val size = rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size - size should be (0) - - ResourceProfile.reInitDefaultProfile(sparkConf) + handler.updateResourceRequests() + handler.getNumExecutorsRunning should be (0) + handler.getNumContainersPendingAllocate should be (3) + + val containerResourcerp2 = Resource.newInstance(10240, 5) + + val container = createContainer("host1", priority = Priority.newInstance(rprof.id)) + val container2 = createContainer("host2", resource = containerResourcerp2, + priority = Priority.newInstance(rprof2.id)) + val container3 = createContainer("host3", resource = containerResourcerp2, + priority = Priority.newInstance(rprof2.id)) + handler.handleAllocatedContainers(Array(container, container2, container3)) + + handler.getNumExecutorsRunning should be (3) + handler.allocatedContainerToHostMap.get(container.getId).get should be ("host1") + handler.allocatedContainerToHostMap.get(container2.getId).get should be ("host2") + handler.allocatedContainerToHostMap.get(container3.getId).get should be ("host3") + + val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id) + hostTocontainer.get("host1").get should contain(container.getId) + val hostTocontainer2 = handler.allocatedHostToContainersMapPerRPId(rprof2.id) + hostTocontainer2.get("host2").get should contain(container2.getId) + hostTocontainer2.get("host3").get should contain(container3.getId) + + val size = + rmClient.getMatchingRequests(container.getPriority, "host1", containerResource).size + size should be (0) + + ResourceProfile.reInitDefaultProfile(sparkConf) + } } test("custom resource requested from yarn") { - ResourceRequestTestHelper.initializeResourceTypes(List("gpu")) - - val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val (handler, _) = createAllocator(1, mockAmClient, - Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G")) - - handler.updateResourceRequests() - val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) - val container = createContainer("host1", resource = defaultResource) - handler.handleAllocatedContainers(Array(container)) - - // get amount of memory and vcores from resource, so effectively skipping their validation - val expectedResources = Resource.newInstance(defaultResource.getMemorySize(), - defaultResource.getVirtualCores) - setResourceRequests(Map("gpu" -> "2G"), expectedResources) - val captor = ArgumentCaptor.forClass(classOf[ContainerRequest]) + withResourceTypes(List("gpu")) { + val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) + val (handler, _) = createAllocator(1, mockAmClient, + Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G")) - verify(mockAmClient).addContainerRequest(captor.capture()) - val containerRequest: ContainerRequest = captor.getValue - assert(containerRequest.getCapability === expectedResources) + handler.updateResourceRequests() + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val container = createContainer("host1", resource = defaultResource) + handler.handleAllocatedContainers(Array(container)) + + // get amount of memory and vcores from resource, so effectively skipping their validation + val expectedResources = Resource.newInstance(defaultResource.getMemorySize(), + defaultResource.getVirtualCores) + setResourceRequests(Map("gpu" -> "2G"), expectedResources) + val captor = ArgumentCaptor.forClass(classOf[ContainerRequest]) + + verify(mockAmClient).addContainerRequest(captor.capture()) + val containerRequest: ContainerRequest = captor.getValue + assert(containerRequest.getCapability === expectedResources) + } } test("custom spark resource mapped to yarn resource configs") { val yarnMadeupResource = "yarn.io/madeup" val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), sparkConf.get(YARN_FPGA_DEVICE), yarnMadeupResource) - ResourceRequestTestHelper.initializeResourceTypes(yarnResources) - val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val madeupConfigName = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}" - val sparkResources = - Map(EXECUTOR_GPU_ID.amountConf -> "3", - EXECUTOR_FPGA_ID.amountConf -> "2", - madeupConfigName -> "5") - val (handler, _) = createAllocator(1, mockAmClient, sparkResources) + withResourceTypes(yarnResources) { + val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) + val madeupConfigName = + s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}" + val sparkResources = + Map(EXECUTOR_GPU_ID.amountConf -> "3", + EXECUTOR_FPGA_ID.amountConf -> "2", + madeupConfigName -> "5") + val (handler, _) = createAllocator(1, mockAmClient, sparkResources) - handler.updateResourceRequests() - val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) - val yarnRInfo = defaultResource.getResources - val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName -> rInfo.getValue) ).toMap - assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty) - assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3) - assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty) - assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).get === 2) - assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) - assert(allResourceInfo.get(yarnMadeupResource).get === 5) + handler.updateResourceRequests() + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val yarnRInfo = defaultResource.getResources + val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName -> rInfo.getValue) ).toMap + assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty) + assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3) + assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty) + assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).get === 2) + assert(allResourceInfo.get(yarnMadeupResource).nonEmpty) + assert(allResourceInfo.get(yarnMadeupResource).get === 5) + } } test("gpu/fpga spark resource mapped to custom yarn resource") { @@ -331,21 +340,22 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with PrivateMethodT sparkConf.set(YARN_GPU_DEVICE.key, gpuCustomName) sparkConf.set(YARN_FPGA_DEVICE.key, fpgaCustomName) val yarnResources = Seq(gpuCustomName, fpgaCustomName) - ResourceRequestTestHelper.initializeResourceTypes(yarnResources) - val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) - val sparkResources = - Map(EXECUTOR_GPU_ID.amountConf -> "3", - EXECUTOR_FPGA_ID.amountConf -> "2") - val (handler, _) = createAllocator(1, mockAmClient, sparkResources) - - handler.updateResourceRequests() - val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) - val yarnRInfo = defaultResource.getResources - val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap - assert(allResourceInfo.get(gpuCustomName).nonEmpty) - assert(allResourceInfo.get(gpuCustomName).get === 3) - assert(allResourceInfo.get(fpgaCustomName).nonEmpty) - assert(allResourceInfo.get(fpgaCustomName).get === 2) + withResourceTypes(yarnResources) { + val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]]) + val sparkResources = + Map(EXECUTOR_GPU_ID.amountConf -> "3", + EXECUTOR_FPGA_ID.amountConf -> "2") + val (handler, _) = createAllocator(1, mockAmClient, sparkResources) + + handler.updateResourceRequests() + val defaultResource = handler.rpIdToYarnResource.get(defaultRPId) + val yarnRInfo = defaultResource.getResources + val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> rInfo.getValue)).toMap + assert(allResourceInfo.get(gpuCustomName).nonEmpty) + assert(allResourceInfo.get(gpuCustomName).get === 3) + assert(allResourceInfo.get(fpgaCustomName).nonEmpty) + assert(allResourceInfo.get(fpgaCustomName).get === 2) + } } finally { sparkConf.set(YARN_GPU_DEVICE.key, originalGpu) sparkConf.set(YARN_FPGA_DEVICE.key, originalFpga) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org