This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f6a044c  [SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover 
`Client.prepareLocalResources` with custom `STAGING_FILE_REPLICATION`
f6a044c is described below

commit f6a044cf8cd83e6b3b30e515acbac0ec81607463
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Nov 9 08:07:38 2021 -0800

    [SPARK-37239][YARN][TESTS][FOLLOWUP] Add UT to cover 
`Client.prepareLocalResources` with custom `STAGING_FILE_REPLICATION`
    
    ### What changes were proposed in this pull request?
    This pr add a new UT to cover 
`o.a.s.deploy.yarn.Client.prepareLocalResources` method with custom 
`STAGING_FILE_REPLICATION` configuration and change other related UTs to verify 
that the `replication` passed into the `copyFileToRemote` method is `None` 
explicitly.
    
    ### Why are the changes needed?
    Add new UT.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #34531 from LuciferYang/SPARK-37239-followup.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../org/apache/spark/deploy/yarn/ClientSuite.scala | 35 +++++++++++++++++-----
 1 file changed, 27 insertions(+), 8 deletions(-)

diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 58e49c9..a8815dc 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -132,7 +132,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
       .set("spark.yarn.dist.jars", ADDED)
     val client = createClient(sparkConf, args = Array("--jar", USER))
     doReturn(new Path("/")).when(client).copyFileToRemote(any(classOf[Path]),
-      any(classOf[Path]), any(), any(classOf[MutableHashMap[URI, Path]]), 
anyBoolean(), any())
+      any(classOf[Path]), meq(None), any(classOf[MutableHashMap[URI, Path]]), 
anyBoolean(), any())
 
     val tempDir = Utils.createTempDir()
     try {
@@ -308,12 +308,12 @@ class ClientSuite extends SparkFunSuite with Matchers {
     assert(sparkConf.get(SPARK_JARS) ===
       Some(Seq(s"local:${jar4.getPath()}", 
s"local:${single.getAbsolutePath()}/*")))
 
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar1.toURI())), any(),
-      any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar2.toURI())), any(),
-      any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar3.toURI())), any(),
-      any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar1.toURI())),
+      meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar2.toURI())),
+      meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(jar3.toURI())),
+      meq(None), any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
 
     val cp = classpath(client)
     cp should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
@@ -330,7 +330,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
     val client = createClient(sparkConf)
     client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
 
-    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(archive.toURI())), any(),
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(archive.toURI())), meq(None),
       any(classOf[MutableHashMap[URI, Path]]), anyBoolean(), any())
     classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
 
@@ -340,6 +340,25 @@ class ClientSuite extends SparkFunSuite with Matchers {
     }
   }
 
+  test("SPARK-37239: distribute jars archive with set 
STAGING_FILE_REPLICATION") {
+    val temp = Utils.createTempDir()
+    val archive = TestUtils.createJarWithFiles(Map(), temp)
+    val replication = 5
+
+    val sparkConf = new SparkConf()
+      .set(SPARK_ARCHIVE, archive.getPath())
+      .set(STAGING_FILE_REPLICATION, replication)
+    val client = createClient(sparkConf)
+    client.prepareLocalResources(new Path(temp.getAbsolutePath()), Nil)
+
+    // It is difficult to assert the result of `setReplication` in UT because 
this method in
+    // `RawLocalFileSystem` always return true and not change the value of 
`replication`.
+    // So we can only assert the call of `client.copyFileToRemote` has passed 
in a non `None`.
+    verify(client).copyFileToRemote(any(classOf[Path]), meq(new 
Path(archive.toURI())),
+      meq(Some(replication.toShort)), any(classOf[MutableHashMap[URI, Path]]), 
anyBoolean(), any())
+    classpath(client) should contain (buildPath(PWD, LOCALIZED_LIB_DIR, "*"))
+  }
+
   test("distribute archive multiple times") {
     val libs = Utils.createTempDir()
     // Create jars dir and RELEASE file to avoid IllegalStateException.

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to