This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 5cc990f5ba7 [SPARK-44091][YARN][TESTS] Introduce `withResourceTypes` 
to `ResourceRequestTestHelper` to restore `resourceTypes` as default value 
after testing
5cc990f5ba7 is described below

commit 5cc990f5ba716edcb6e0e05075e28a1ce604fd22
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun Aug 27 21:35:13 2023 -0700

    [SPARK-44091][YARN][TESTS] Introduce `withResourceTypes` to 
`ResourceRequestTestHelper` to restore `resourceTypes` as default value after 
testing
    
    ### What changes were proposed in this pull request?
    This pr convert `ResourceRequestTestHelper` from `object` to `trait` and 
introduce a new function named `withResourceTypes` to 
`ResourceRequestTestHelper` to restore `resourceTypes` as default value after 
testing.
    
    ### Why are the changes needed?
    When test yarn module with command `build/sbt "yarn/test" -Pyarn 
-Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest`, there will some 
test failed in `YarnClusterSuite` like
    
    ```
    [info] YarnClusterSuite:
    [info] - run Spark in yarn-client mode *** FAILED *** (3 seconds, 125 
milliseconds)
    [info]   FAILED did not equal FINISHED (stdout/stderr was not captured) 
(BaseYarnClusterSuite.scala:238)
    [info]   org.scalatest.exceptions.TestFailedException:
    [info]   at 
org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:472)
    [info]   at 
org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:471)
    [info]   at 
org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1231)
    [info]   at 
org.scalatest.Assertions$AssertionsHelper.macroAssert(Assertions.scala:1295)
    [info]   at 
org.apache.spark.deploy.yarn.BaseYarnClusterSuite.checkResult(BaseYarnClusterSuite.scala:238)
    [info]   at 
org.apache.spark.deploy.yarn.YarnClusterSuite.testBasicYarnApp(YarnClusterSuite.scala:350)
    [info]   at 
org.apache.spark.deploy.yarn.YarnClusterSuite.$anonfun$new$1(YarnClusterSuite.scala:95)
    [info]   at 
scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    [info]   at 
org.apache.spark.deploy.yarn.BaseYarnClusterSuite.$anonfun$test$1(BaseYarnClusterSuite.scala:77)
    [info]   at 
org.scalatest.enablers.Timed$$anon$1.timeoutAfter(Timed.scala:127)
    [info]   at 
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:282)
    [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
    [info]   at 
org.scalatest.concurrent.TimeLimits.failAfter$(TimeLimits.scala:230)
    [info]   at org.apache.spark.SparkFunSuite.failAfter(SparkFunSuite.scala:69)
    [info]   at 
org.apache.spark.SparkFunSuite.$anonfun$test$2(SparkFunSuite.scala:155)
    [info]   at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
    [info]   at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
    [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
    [info]   at org.scalatest.Transformer.apply(Transformer.scala:20)
    ...
    ```
    
    and the following error logs will in `unit-tests.log`:
    
    ```
    23/06/20 16:56:38.056 IPC Server handler 10 on default port 49553 WARN 
Server: IPC Server handler 10 on default port 49553, call Call#3 Retry#0 
org.apache.hadoop.yarn.api.ApplicationClientProtocolPB.getApplicationReport 
from 127.0.0.1:49561
    org.apache.hadoop.yarn.exceptions.ResourceNotFoundException: The resource 
manager encountered a problem that should not occur under normal circumstances. 
Please report this error to the Hadoop community by opening a JIRA ticket at 
http://issues.apache.org/jira and including the following information:
    * Resource type requested: custom-resource-type-1
    * Resource object: <memory:-1, vCores:-1>
    * The stack trace for this exception: java.lang.Exception
      at 
org.apache.hadoop.yarn.exceptions.ResourceNotFoundException.<init>(ResourceNotFoundException.java:47)
      at 
org.apache.hadoop.yarn.api.records.Resource.getResourceInformation(Resource.java:264)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.setResourceInformation(ResourcePBImpl.java:213)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl.getProto(ResourcePBImpl.java:61)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils.convertToProtoFormat(ProtoUtils.java:463)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.convertToProtoFormat(ApplicationResourceUsageReportPBImpl.java:289)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.mergeLocalToBuilder(ApplicationResourceUsageReportPBImpl.java:91)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.mergeLocalToProto(ApplicationResourceUsageReportPBImpl.java:122)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationResourceUsageReportPBImpl.getProto(ApplicationResourceUsageReportPBImpl.java:63)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils.convertToProtoFormat(ProtoUtils.java:247)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl.convertToProtoFormat(ApplicationReportPBImpl.java:560)
      at 
org.apache.hadoop.yarn.api.records.impl.pb.ApplicationReportPBImpl.setApplicationResourceUsageReport(ApplicationReportPBImpl.java:100)
      at 
org.apache.hadoop.yarn.server.utils.BuilderUtils.newApplicationReport(BuilderUtils.java:406)
      at 
org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl.createAndGetApplicationReport(RMAppImpl.java:779)
      at 
org.apache.hadoop.yarn.server.resourcemanager.ClientRMService.getApplicationReport(ClientRMService.java:429)
      at 
org.apache.hadoop.yarn.api.impl.pb.service.ApplicationClientProtocolPBServiceImpl.getApplicationReport(ApplicationClientProtocolPBServiceImpl.java:247)
      at 
org.apache.hadoop.yarn.proto.ApplicationClientProtocol$ApplicationClientProtocolService$2.callBlockingMethod(ApplicationClientProtocol.java:615)
      at 
org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:621)
      at 
org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:589)
      at 
org.apache.hadoop.ipc.ProtobufRpcEngine2$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine2.java:573)
      at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1213)
      at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1089)
      at org.apache.hadoop.ipc.Server$RpcCall.run(Server.java:1012)
      at java.security.AccessController.doPrivileged(Native Method)
      at javax.security.auth.Subject.doAs(Subject.java:422)
      at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1899)
      at org.apache.hadoop.ipc.Server$Handler.run(Server.java:3026)
    ...
    ```
    
    then mvn test will also have similar failure results
    
    ```
    build/mvn clean install -pl resource-managers/yarn -Pyarn 
-Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest
    ```
    
    ```
    YarnClusterSuite:
    2023-06-20 09:47:45.531:INFO::ScalaTest-main-running-DiscoverySuite: 
Logging initialized 7792ms to org.eclipse.jetty.util.log.StdErrLog
    - run Spark in yarn-client mode *** FAILED ***
      FAILED did not equal FINISHED (stdout/stderr was not captured) 
(BaseYarnClusterSuite.scala:238)
    - run Spark in yarn-cluster mode *** FAILED ***
      FAILED did not equal FINISHED (stdout/stderr was not captured) 
(BaseYarnClusterSuite.scala:238)
    - run Spark in yarn-client mode with unmanaged am
    - run Spark in yarn-client mode with different configurations, ensuring 
redaction
    - run Spark in yarn-cluster mode with different configurations, ensuring 
redaction *** FAILED ***
      FAILED did not equal FINISHED (stdout/stderr was not captured) 
(BaseYarnClusterSuite.scala:238)
    - yarn-cluster should respect conf overrides in SparkHadoopUtil 
(SPARK-16414, SPARK-23630) *** FAILED ***
      FAILED did not equal FINISHED (stdout/stderr was not captured) 
(BaseYarnClusterSuite.scala:238)
    ```
    
    The call to `ResourceUtils.reinitializeResources` will fill  private static 
variable contents of `ResourceUtils`, like:
    
    ```
    private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX = new 
ConcurrentHashMap<String, Integer>();
    private static volatile Map<String, ResourceInformation> resourceTypes;
    private static volatile Map<String, ResourceInformation> 
nonCountableResourceTypes;
    private static volatile ResourceInformation[] resourceTypesArray;
    private static volatile Map<String, ResourceInformation> 
readOnlyNodeResources;
    ```
    
    and these static variable will not be cleaned up automatically, this may 
cause different test cases to misuse these shared variables and unexpectedly 
fail in certain specific scenarios, so this pr use the new function 
`withResourceTypes` to restore `resourceTypes` as default value after testing.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Pass GitHub Actions
    - Manual checked
    
    ```
    build/sbt "yarn/test" -Pyarn 
-Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest
    ```
    
    and
    
    ```
    build/mvn clean install -pl resource-managers/yarn -Pyarn 
-Dtest.exclude.tags=org.apache.spark.tags.ExtendedLevelDBTest
    ```
    
    can run successfully after this pr
    
    Closes #41673 from LuciferYang/SPARK-44091.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
    (cherry picked from commit de96a86d9dc3b32d87deb5a49a4a2d0f6add98a0)
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 172 +++++++-------
 .../deploy/yarn/ResourceRequestHelperSuite.scala   |  37 +--
 .../deploy/yarn/ResourceRequestTestHelper.scala    |  13 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala     | 252 +++++++++++----------
 4 files changed, 250 insertions(+), 224 deletions(-)

diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index ba116c27716..b7fb409ebc3 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -55,7 +55,9 @@ import org.apache.spark.resource.ResourceID
 import org.apache.spark.resource.ResourceUtils.AMOUNT
 import org.apache.spark.util.{SparkConfWithEnv, Utils}
 
-class ClientSuite extends SparkFunSuite with Matchers {
+class ClientSuite extends SparkFunSuite
+    with Matchers
+    with ResourceRequestTestHelper {
   private def doReturn(value: Any) = org.mockito.Mockito.doReturn(value, 
Seq.empty: _*)
 
   import Client._
@@ -473,24 +475,24 @@ class ClientSuite extends SparkFunSuite with Matchers {
   ).foreach { case (deployMode, prefix) =>
     test(s"custom resource request ($deployMode mode)") {
       val resources = Map("fpga" -> 2, "gpu" -> 3)
-      ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
+      withResourceTypes(resources.keys.toSeq) {
+        val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
+        resources.foreach { case (name, v) =>
+          conf.set(s"${prefix}${name}.${AMOUNT}", v.toString)
+        }
 
-      val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
-      resources.foreach { case (name, v) =>
-        conf.set(s"${prefix}${name}.${AMOUNT}", v.toString)
-      }
+        val appContext = 
Records.newRecord(classOf[ApplicationSubmissionContext])
+        val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+        val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
 
-      val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-      val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
-      val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+        val client = new Client(new ClientArguments(Array()), conf, null)
+        client.createApplicationSubmissionContext(
+          new YarnClientApplication(getNewApplicationResponse, appContext),
+          containerLaunchContext)
 
-      val client = new Client(new ClientArguments(Array()), conf, null)
-      client.createApplicationSubmissionContext(
-        new YarnClientApplication(getNewApplicationResponse, appContext),
-        containerLaunchContext)
-
-      resources.foreach { case (name, value) =>
-        appContext.getResource.getResourceInformation(name).getValue should be 
(value)
+        resources.foreach { case (name, value) =>
+          appContext.getResource.getResourceInformation(name).getValue should 
be (value)
+        }
       }
     }
   }
@@ -498,43 +500,45 @@ class ClientSuite extends SparkFunSuite with Matchers {
   test("custom driver resource request yarn config and spark config fails") {
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", 
conf.get(YARN_FPGA_DEVICE) -> "fpga")
-    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
-    resources.keys.foreach { yarnName =>
-      conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", 
"2")
-    }
-    resources.values.foreach { rName =>
-      conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
-    }
+    withResourceTypes(resources.keys.toSeq) {
+      resources.keys.foreach { yarnName =>
+        conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", 
"2")
+      }
+      resources.values.foreach { rName =>
+        conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
+      }
 
-    val error = intercept[SparkException] {
-      ResourceRequestHelper.validateResources(conf)
-    }.getMessage()
+      val error = intercept[SparkException] {
+        ResourceRequestHelper.validateResources(conf)
+      }.getMessage()
 
-    assert(error.contains("Do not use 
spark.yarn.driver.resource.yarn.io/fpga.amount," +
-      " please use spark.driver.resource.fpga.amount"))
-    assert(error.contains("Do not use 
spark.yarn.driver.resource.yarn.io/gpu.amount," +
-      " please use spark.driver.resource.gpu.amount"))
+      assert(error.contains("Do not use 
spark.yarn.driver.resource.yarn.io/fpga.amount," +
+        " please use spark.driver.resource.fpga.amount"))
+      assert(error.contains("Do not use 
spark.yarn.driver.resource.yarn.io/gpu.amount," +
+        " please use spark.driver.resource.gpu.amount"))
+    }
   }
 
   test("custom executor resource request yarn config and spark config fails") {
     val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
     val resources = Map(conf.get(YARN_GPU_DEVICE) -> "gpu", 
conf.get(YARN_FPGA_DEVICE) -> "fpga")
-    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
-    resources.keys.foreach { yarnName =>
-      conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", 
"2")
-    }
-    resources.values.foreach { rName =>
-      conf.set(new ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3")
-    }
+    withResourceTypes(resources.keys.toSeq) {
+      resources.keys.foreach { yarnName =>
+        
conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
+      }
+      resources.values.foreach { rName =>
+        conf.set(new ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3")
+      }
 
-    val error = intercept[SparkException] {
-      ResourceRequestHelper.validateResources(conf)
-    }.getMessage()
+      val error = intercept[SparkException] {
+        ResourceRequestHelper.validateResources(conf)
+      }.getMessage()
 
-    assert(error.contains("Do not use 
spark.yarn.executor.resource.yarn.io/fpga.amount," +
-      " please use spark.executor.resource.fpga.amount"))
-    assert(error.contains("Do not use 
spark.yarn.executor.resource.yarn.io/gpu.amount," +
-      " please use spark.executor.resource.gpu.amount"))
+      assert(error.contains("Do not use 
spark.yarn.executor.resource.yarn.io/fpga.amount," +
+        " please use spark.executor.resource.fpga.amount"))
+      assert(error.contains("Do not use 
spark.yarn.executor.resource.yarn.io/gpu.amount," +
+        " please use spark.executor.resource.gpu.amount"))
+    }
   }
 
 
@@ -545,30 +549,30 @@ class ClientSuite extends SparkFunSuite with Matchers {
       conf.get(YARN_FPGA_DEVICE) -> "fpga",
       yarnMadeupResource -> "madeup")
 
-    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
-
-    resources.values.foreach { rName =>
-      conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
-    }
-    // also just set yarn one that we don't convert
-    
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}",
 "5")
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
-    val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
+    withResourceTypes(resources.keys.toSeq) {
+      resources.values.foreach { rName =>
+        conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
+      }
+      // also just set yarn one that we don't convert
+      
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}",
 "5")
+      val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+      val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+      val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
 
-    val client = new Client(new ClientArguments(Array()), conf, null)
-    val newContext = client.createApplicationSubmissionContext(
-      new YarnClientApplication(getNewApplicationResponse, appContext),
-      containerLaunchContext)
+      val client = new Client(new ClientArguments(Array()), conf, null)
+      val newContext = client.createApplicationSubmissionContext(
+        new YarnClientApplication(getNewApplicationResponse, appContext),
+        containerLaunchContext)
 
-    val yarnRInfo = newContext.getResource.getResources
-    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
-    assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty)
-    assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3)
-    assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty)
-    assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).get === 3)
-    assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
-    assert(allResourceInfo.get(yarnMadeupResource).get === 5)
+      val yarnRInfo = newContext.getResource.getResources
+      val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
+      assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).nonEmpty)
+      assert(allResourceInfo.get(conf.get(YARN_GPU_DEVICE)).get === 3)
+      assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).nonEmpty)
+      assert(allResourceInfo.get(conf.get(YARN_FPGA_DEVICE)).get === 3)
+      assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
+      assert(allResourceInfo.get(yarnMadeupResource).get === 5)
+    }
   }
 
   test("gpu/fpga spark resources mapped to custom yarn resources") {
@@ -579,26 +583,26 @@ class ClientSuite extends SparkFunSuite with Matchers {
     conf.set(YARN_FPGA_DEVICE.key, fpgaCustomName)
     val resources = Map(gpuCustomName -> "gpu",
       fpgaCustomName -> "fpga")
+    withResourceTypes(resources.keys.toSeq) {
+      resources.values.foreach { rName =>
+        conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
+      }
+      val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
+      val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
+      val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
 
-    ResourceRequestTestHelper.initializeResourceTypes(resources.keys.toSeq)
-    resources.values.foreach { rName =>
-      conf.set(new ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
-    }
-    val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
-    val getNewApplicationResponse = 
Records.newRecord(classOf[GetNewApplicationResponse])
-    val containerLaunchContext = 
Records.newRecord(classOf[ContainerLaunchContext])
-
-    val client = new Client(new ClientArguments(Array()), conf, null)
-    val newContext = client.createApplicationSubmissionContext(
-      new YarnClientApplication(getNewApplicationResponse, appContext),
-      containerLaunchContext)
+      val client = new Client(new ClientArguments(Array()), conf, null)
+      val newContext = client.createApplicationSubmissionContext(
+        new YarnClientApplication(getNewApplicationResponse, appContext),
+        containerLaunchContext)
 
-    val yarnRInfo = newContext.getResource.getResources
-    val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
-    assert(allResourceInfo.get(gpuCustomName).nonEmpty)
-    assert(allResourceInfo.get(gpuCustomName).get === 3)
-    assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
-    assert(allResourceInfo.get(fpgaCustomName).get === 3)
+      val yarnRInfo = newContext.getResource.getResources
+      val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
+      assert(allResourceInfo.get(gpuCustomName).nonEmpty)
+      assert(allResourceInfo.get(gpuCustomName).get === 3)
+      assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
+      assert(allResourceInfo.get(fpgaCustomName).get === 3)
+    }
   }
 
   test("test yarn jars path not exists") {
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 53b6d192e72..56ec5a801e4 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -28,7 +28,9 @@ import org.apache.spark.deploy.yarn.config._
 import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, 
EXECUTOR_CORES, EXECUTOR_MEMORY}
 import org.apache.spark.resource.ResourceUtils.AMOUNT
 
-class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
+class ResourceRequestHelperSuite extends SparkFunSuite
+    with Matchers
+    with ResourceRequestTestHelper {
 
   private val CUSTOM_RES_1 = "custom-resource-type-1"
   private val CUSTOM_RES_2 = "custom-resource-type-2"
@@ -104,17 +106,16 @@ class ResourceRequestHelperSuite extends SparkFunSuite 
with Matchers {
       val requests = resources.map { case (rName, rValue, rUnit) =>
         (rName, rValue.toString + rUnit)
       }.toMap
-
-      ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
-
-      val resource = createResource()
-      setResourceRequests(requests, resource)
-
-      resources.foreach { case (rName, rValue, rUnit) =>
-        val requested = resource.getResourceInformation(rName)
-        assert(requested.getName === rName)
-        assert(requested.getValue === rValue)
-        assert(requested.getUnits === rUnit)
+      withResourceTypes(resourceDefs) {
+        val resource = createResource()
+        setResourceRequests(requests, resource)
+
+        resources.foreach { case (rName, rValue, rUnit) =>
+          val requested = resource.getResourceInformation(rName)
+          assert(requested.getName === rName)
+          assert(requested.getValue === rValue)
+          assert(requested.getUnits === rUnit)
+        }
       }
     }
   }
@@ -125,13 +126,13 @@ class ResourceRequestHelperSuite extends SparkFunSuite 
with Matchers {
     ("invalid unit", CUSTOM_RES_1, "123ppp")
   ).foreach { case (name, key, value) =>
     test(s"invalid request: $name") {
-      ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
-
-      val resource = createResource()
-      val thrown = intercept[IllegalArgumentException] {
-        setResourceRequests(Map(key -> value), resource)
+      withResourceTypes(Seq(key)) {
+        val resource = createResource()
+        val thrown = intercept[IllegalArgumentException] {
+          setResourceRequests(Map(key -> value), resource)
+        }
+        thrown.getMessage should include (key)
       }
-      thrown.getMessage should include (key)
     }
   }
 
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
index 19c842f0928..826bded50f8 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestTestHelper.scala
@@ -22,7 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.hadoop.yarn.api.records.ResourceTypeInfo
 import org.apache.hadoop.yarn.util.resource.ResourceUtils
 
-object ResourceRequestTestHelper {
+trait ResourceRequestTestHelper {
   def initializeResourceTypes(resourceTypes: Seq[String]): Unit = {
     // ResourceUtils.reinitializeResources() is the YARN-way
     // to specify resources for the execution of the tests.
@@ -38,4 +38,15 @@ object ResourceRequestTestHelper {
 
     ResourceUtils.reinitializeResources(allResourceTypes.asJava)
   }
+
+  /**
+   * `initializeResourceTypes` with inputs, call `f` and
+   * restore `resourceTypes`` as default value.
+   */
+  def withResourceTypes(resourceTypes: Seq[String])(f: => Unit): Unit = {
+    initializeResourceTypes(resourceTypes)
+    try f finally {
+      initializeResourceTypes(Seq.empty)
+    }
+  }
 }
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index 055edfbf767..f6f2e1b11d5 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -62,7 +62,10 @@ class MockResolver extends 
SparkRackResolver(SparkHadoopUtil.get.conf) {
 
 }
 
-class YarnAllocatorSuite extends SparkFunSuite with Matchers with 
PrivateMethodTester {
+class YarnAllocatorSuite extends SparkFunSuite
+    with Matchers
+    with PrivateMethodTester
+    with ResourceRequestTestHelper {
   val conf = new YarnConfiguration()
   val sparkConf = new SparkConf()
   sparkConf.set(DRIVER_HOST_ADDRESS, "localhost")
@@ -191,135 +194,141 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with PrivateMethodT
 
   test("single container allocated with ResourceProfile") {
     val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE))
-    ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
-    // create default profile so we get a different id to test below
-    val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
-    val execReq = new ExecutorResourceRequests().resource("gpu", 6)
-    val taskReq = new TaskResourceRequests().resource("gpu", 1)
-    val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
-    // request a single container and receive it
-    val (handler, _) = createAllocator(0)
+    withResourceTypes(yarnResources) {
+      // create default profile so we get a different id to test below
+      val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+      val execReq = new ExecutorResourceRequests().resource("gpu", 6)
+      val taskReq = new TaskResourceRequests().resource("gpu", 1)
+      val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
+      // request a single container and receive it
+      val (handler, _) = createAllocator(0)
+
+      val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, 
rprof -> 1)
+      val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id 
-> 0)
+      
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+        numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
 
-    val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof 
-> 1)
-    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id 
-> 0)
-    
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
-      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
+      handler.updateResourceRequests()
+      handler.getNumExecutorsRunning should be (0)
+      handler.getNumContainersPendingAllocate should be (1)
 
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getNumContainersPendingAllocate should be (1)
+      val container = createContainer("host1", priority = 
Priority.newInstance(rprof.id))
+      handler.handleAllocatedContainers(Array(container))
 
-    val container = createContainer("host1", priority = 
Priority.newInstance(rprof.id))
-    handler.handleAllocatedContainers(Array(container))
+      handler.getNumExecutorsRunning should be (1)
+      handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
+      val hostTocontainer = 
handler.allocatedHostToContainersMapPerRPId(rprof.id)
+      hostTocontainer.get("host1").get should contain(container.getId)
 
-    handler.getNumExecutorsRunning should be (1)
-    handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
-    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
-    hostTocontainer.get("host1").get should contain(container.getId)
-
-    val size = rmClient.getMatchingRequests(container.getPriority, "host1", 
containerResource).size
-    size should be (0)
+      val size =
+        rmClient.getMatchingRequests(container.getPriority, "host1", 
containerResource).size
+      size should be (0)
 
-    ResourceProfile.reInitDefaultProfile(sparkConf)
+      ResourceProfile.reInitDefaultProfile(sparkConf)
+    }
   }
 
   test("multiple containers allocated with ResourceProfiles") {
     val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), 
sparkConf.get(YARN_FPGA_DEVICE))
-    ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
-    // create default profile so we get a different id to test below
-    val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
-    val execReq = new ExecutorResourceRequests().resource("gpu", 6)
-    val taskReq = new TaskResourceRequests().resource("gpu", 1)
-    val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
+    withResourceTypes(yarnResources) {
+      // create default profile so we get a different id to test below
+      val defaultRProf = ResourceProfile.getOrCreateDefaultProfile(sparkConf)
+      val execReq = new ExecutorResourceRequests().resource("gpu", 6)
+      val taskReq = new TaskResourceRequests().resource("gpu", 1)
+      val rprof = new ResourceProfile(execReq.requests, taskReq.requests)
 
-    val execReq2 = new 
ExecutorResourceRequests().memory("8g").resource("fpga", 2)
-    val taskReq2 = new TaskResourceRequests().resource("fpga", 1)
-    val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests)
+      val execReq2 = new 
ExecutorResourceRequests().memory("8g").resource("fpga", 2)
+      val taskReq2 = new TaskResourceRequests().resource("fpga", 1)
+      val rprof2 = new ResourceProfile(execReq2.requests, taskReq2.requests)
 
 
-    // request a single container and receive it
-    val (handler, _) = createAllocator(1)
-    val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, rprof 
-> 1, rprof2 -> 2)
-    val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id 
-> 0, rprof2.id -> 0)
-    
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
-      numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
-
-    handler.updateResourceRequests()
-    handler.getNumExecutorsRunning should be (0)
-    handler.getNumContainersPendingAllocate should be (3)
-
-    val containerResourcerp2 = Resource.newInstance(10240, 5)
-
-    val container = createContainer("host1", priority = 
Priority.newInstance(rprof.id))
-    val container2 = createContainer("host2", resource = containerResourcerp2,
-      priority = Priority.newInstance(rprof2.id))
-    val container3 = createContainer("host3", resource = containerResourcerp2,
-      priority = Priority.newInstance(rprof2.id))
-    handler.handleAllocatedContainers(Array(container, container2, container3))
+      // request a single container and receive it
+      val (handler, _) = createAllocator(1)
+      val resourceProfileToTotalExecs = mutable.HashMap(defaultRProf -> 0, 
rprof -> 1, rprof2 -> 2)
+      val numLocalityAwareTasksPerResourceProfileId = mutable.HashMap(rprof.id 
-> 0, rprof2.id -> 0)
+      
handler.requestTotalExecutorsWithPreferredLocalities(resourceProfileToTotalExecs.toMap,
+        numLocalityAwareTasksPerResourceProfileId.toMap, Map.empty, Set.empty)
 
-    handler.getNumExecutorsRunning should be (3)
-    handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
-    handler.allocatedContainerToHostMap.get(container2.getId).get should be 
("host2")
-    handler.allocatedContainerToHostMap.get(container3.getId).get should be 
("host3")
-
-    val hostTocontainer = handler.allocatedHostToContainersMapPerRPId(rprof.id)
-    hostTocontainer.get("host1").get should contain(container.getId)
-    val hostTocontainer2 = 
handler.allocatedHostToContainersMapPerRPId(rprof2.id)
-    hostTocontainer2.get("host2").get should contain(container2.getId)
-    hostTocontainer2.get("host3").get should contain(container3.getId)
-
-    val size = rmClient.getMatchingRequests(container.getPriority, "host1", 
containerResource).size
-    size should be (0)
-
-    ResourceProfile.reInitDefaultProfile(sparkConf)
+      handler.updateResourceRequests()
+      handler.getNumExecutorsRunning should be (0)
+      handler.getNumContainersPendingAllocate should be (3)
+
+      val containerResourcerp2 = Resource.newInstance(10240, 5)
+
+      val container = createContainer("host1", priority = 
Priority.newInstance(rprof.id))
+      val container2 = createContainer("host2", resource = 
containerResourcerp2,
+        priority = Priority.newInstance(rprof2.id))
+      val container3 = createContainer("host3", resource = 
containerResourcerp2,
+        priority = Priority.newInstance(rprof2.id))
+      handler.handleAllocatedContainers(Array(container, container2, 
container3))
+
+      handler.getNumExecutorsRunning should be (3)
+      handler.allocatedContainerToHostMap.get(container.getId).get should be 
("host1")
+      handler.allocatedContainerToHostMap.get(container2.getId).get should be 
("host2")
+      handler.allocatedContainerToHostMap.get(container3.getId).get should be 
("host3")
+
+      val hostTocontainer = 
handler.allocatedHostToContainersMapPerRPId(rprof.id)
+      hostTocontainer.get("host1").get should contain(container.getId)
+      val hostTocontainer2 = 
handler.allocatedHostToContainersMapPerRPId(rprof2.id)
+      hostTocontainer2.get("host2").get should contain(container2.getId)
+      hostTocontainer2.get("host3").get should contain(container3.getId)
+
+      val size =
+        rmClient.getMatchingRequests(container.getPriority, "host1", 
containerResource).size
+      size should be (0)
+
+      ResourceProfile.reInitDefaultProfile(sparkConf)
+    }
   }
 
   test("custom resource requested from yarn") {
-    ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
-
-    val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
-    val (handler, _) = createAllocator(1, mockAmClient,
-      Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
-
-    handler.updateResourceRequests()
-    val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-    val container = createContainer("host1", resource = defaultResource)
-    handler.handleAllocatedContainers(Array(container))
-
-    // get amount of memory and vcores from resource, so effectively skipping 
their validation
-    val expectedResources = 
Resource.newInstance(defaultResource.getMemorySize(),
-      defaultResource.getVirtualCores)
-    setResourceRequests(Map("gpu" -> "2G"), expectedResources)
-    val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
+    withResourceTypes(List("gpu")) {
+      val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+      val (handler, _) = createAllocator(1, mockAmClient,
+        Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
 
-    verify(mockAmClient).addContainerRequest(captor.capture())
-    val containerRequest: ContainerRequest = captor.getValue
-    assert(containerRequest.getCapability === expectedResources)
+      handler.updateResourceRequests()
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val container = createContainer("host1", resource = defaultResource)
+      handler.handleAllocatedContainers(Array(container))
+
+      // get amount of memory and vcores from resource, so effectively 
skipping their validation
+      val expectedResources = 
Resource.newInstance(defaultResource.getMemorySize(),
+        defaultResource.getVirtualCores)
+      setResourceRequests(Map("gpu" -> "2G"), expectedResources)
+      val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
+
+      verify(mockAmClient).addContainerRequest(captor.capture())
+      val containerRequest: ContainerRequest = captor.getValue
+      assert(containerRequest.getCapability === expectedResources)
+    }
   }
 
   test("custom spark resource mapped to yarn resource configs") {
     val yarnMadeupResource = "yarn.io/madeup"
     val yarnResources = Seq(sparkConf.get(YARN_GPU_DEVICE), 
sparkConf.get(YARN_FPGA_DEVICE),
       yarnMadeupResource)
-    ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
-    val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
-    val madeupConfigName = 
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}"
-    val sparkResources =
-      Map(EXECUTOR_GPU_ID.amountConf -> "3",
-        EXECUTOR_FPGA_ID.amountConf -> "2",
-        madeupConfigName -> "5")
-    val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
+    withResourceTypes(yarnResources) {
+      val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+      val madeupConfigName =
+        
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}"
+      val sparkResources =
+        Map(EXECUTOR_GPU_ID.amountConf -> "3",
+          EXECUTOR_FPGA_ID.amountConf -> "2",
+          madeupConfigName -> "5")
+      val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
 
-    handler.updateResourceRequests()
-    val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-    val yarnRInfo = defaultResource.getResources
-    val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName -> 
rInfo.getValue) ).toMap
-    assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty)
-    assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3)
-    assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty)
-    assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).get === 2)
-    assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
-    assert(allResourceInfo.get(yarnMadeupResource).get === 5)
+      handler.updateResourceRequests()
+      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+      val yarnRInfo = defaultResource.getResources
+      val allResourceInfo = yarnRInfo.map( rInfo => (rInfo.getName -> 
rInfo.getValue) ).toMap
+      assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).nonEmpty)
+      assert(allResourceInfo.get(sparkConf.get(YARN_GPU_DEVICE)).get === 3)
+      assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).nonEmpty)
+      assert(allResourceInfo.get(sparkConf.get(YARN_FPGA_DEVICE)).get === 2)
+      assert(allResourceInfo.get(yarnMadeupResource).nonEmpty)
+      assert(allResourceInfo.get(yarnMadeupResource).get === 5)
+    }
   }
 
   test("gpu/fpga spark resource mapped to custom yarn resource") {
@@ -331,21 +340,22 @@ class YarnAllocatorSuite extends SparkFunSuite with 
Matchers with PrivateMethodT
       sparkConf.set(YARN_GPU_DEVICE.key, gpuCustomName)
       sparkConf.set(YARN_FPGA_DEVICE.key, fpgaCustomName)
       val yarnResources = Seq(gpuCustomName, fpgaCustomName)
-      ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
-      val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
-      val sparkResources =
-        Map(EXECUTOR_GPU_ID.amountConf -> "3",
-          EXECUTOR_FPGA_ID.amountConf -> "2")
-      val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
-
-      handler.updateResourceRequests()
-      val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
-      val yarnRInfo = defaultResource.getResources
-      val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
-      assert(allResourceInfo.get(gpuCustomName).nonEmpty)
-      assert(allResourceInfo.get(gpuCustomName).get === 3)
-      assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
-      assert(allResourceInfo.get(fpgaCustomName).get === 2)
+      withResourceTypes(yarnResources) {
+        val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+        val sparkResources =
+          Map(EXECUTOR_GPU_ID.amountConf -> "3",
+            EXECUTOR_FPGA_ID.amountConf -> "2")
+        val (handler, _) = createAllocator(1, mockAmClient, sparkResources)
+
+        handler.updateResourceRequests()
+        val defaultResource = handler.rpIdToYarnResource.get(defaultRPId)
+        val yarnRInfo = defaultResource.getResources
+        val allResourceInfo = yarnRInfo.map(rInfo => (rInfo.getName -> 
rInfo.getValue)).toMap
+        assert(allResourceInfo.get(gpuCustomName).nonEmpty)
+        assert(allResourceInfo.get(gpuCustomName).get === 3)
+        assert(allResourceInfo.get(fpgaCustomName).nonEmpty)
+        assert(allResourceInfo.get(fpgaCustomName).get === 2)
+      }
     } finally {
       sparkConf.set(YARN_GPU_DEVICE.key, originalGpu)
       sparkConf.set(YARN_FPGA_DEVICE.key, originalFpga)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to