Repository: spark
Updated Branches:
  refs/heads/branch-2.3 551ccfba5 -> 317b0aaed


[SPARK-22587] Spark job fails if fs.defaultFS and application jar are different 
url

## What changes were proposed in this pull request?

Two filesystems comparing does not consider the authority of URI. This is 
specific for
WASB file storage system, where userInfo is honored to differentiate 
filesystems.
For example: wasbs://user1xyz.net, wasbs://user2xyz.net would consider as two 
filesystem.
Therefore, we have to add the authority to compare two filesystem, and  two 
filesystem with different authority can not be the same FS.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Mingjie Tang <mt...@hortonworks.com>

Closes #19885 from merlintang/EAR-7377.

(cherry picked from commit a6647ffbf7a312a3e119a9beef90880cc915aa60)
Signed-off-by: jerryshao <ss...@hortonworks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/317b0aae
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/317b0aae
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/317b0aae

Branch: refs/heads/branch-2.3
Commit: 317b0aaed83e4bbf66f63ddc0d618da9f1f85085
Parents: 551ccfb
Author: Mingjie Tang <mt...@hortonworks.com>
Authored: Thu Jan 11 11:51:03 2018 +0800
Committer: jerryshao <ss...@hortonworks.com>
Committed: Thu Jan 11 11:51:34 2018 +0800

----------------------------------------------------------------------
 .../org/apache/spark/deploy/yarn/Client.scala   | 24 +++++++++++---
 .../apache/spark/deploy/yarn/ClientSuite.scala  | 33 ++++++++++++++++++++
 2 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/317b0aae/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 15328d0..8cd3cd9 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -1421,15 +1421,20 @@ private object Client extends Logging {
   }
 
   /**
-   * Return whether the two file systems are the same.
+   * Return whether two URI represent file system are the same
    */
-  private def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
-    val srcUri = srcFs.getUri()
-    val dstUri = destFs.getUri()
+  private[spark] def compareUri(srcUri: URI, dstUri: URI): Boolean = {
+
     if (srcUri.getScheme() == null || srcUri.getScheme() != 
dstUri.getScheme()) {
       return false
     }
 
+    val srcAuthority = srcUri.getAuthority()
+    val dstAuthority = dstUri.getAuthority()
+    if (srcAuthority != null && !srcAuthority.equalsIgnoreCase(dstAuthority)) {
+      return false
+    }
+
     var srcHost = srcUri.getHost()
     var dstHost = dstUri.getHost()
 
@@ -1447,6 +1452,17 @@ private object Client extends Logging {
     }
 
     Objects.equal(srcHost, dstHost) && srcUri.getPort() == dstUri.getPort()
+
+  }
+
+  /**
+   * Return whether the two file systems are the same.
+   */
+  protected def compareFs(srcFs: FileSystem, destFs: FileSystem): Boolean = {
+    val srcUri = srcFs.getUri()
+    val dstUri = destFs.getUri()
+
+    compareUri(srcUri, dstUri)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/317b0aae/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
----------------------------------------------------------------------
diff --git 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index 9d5f5eb..7fa5971 100644
--- 
a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ 
b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -357,6 +357,39 @@ class ClientSuite extends SparkFunSuite with Matchers {
     sparkConf.get(SECONDARY_JARS) should be (Some(Seq(new 
File(jar2.toURI).getName)))
   }
 
+  private val matching = Seq(
+    ("files URI match test1", "file:///file1", "file:///file2"),
+    ("files URI match test2", "file:///c:file1", "file://c:file2"),
+    ("files URI match test3", "file://host/file1", "file://host/file2"),
+    ("wasb URI match test", "wasb://bucket1@user", "wasb://bucket1@user/"),
+    ("hdfs URI match test", "hdfs:/path1", "hdfs:/path1")
+  )
+
+  matching.foreach { t =>
+      test(t._1) {
+        assert(Client.compareUri(new URI(t._2), new URI(t._3)),
+          s"No match between ${t._2} and ${t._3}")
+      }
+  }
+
+  private val unmatching = Seq(
+    ("files URI unmatch test1", "file:///file1", "file://host/file2"),
+    ("files URI unmatch test2", "file://host/file1", "file:///file2"),
+    ("files URI unmatch test3", "file://host/file1", "file://host2/file2"),
+    ("wasb URI unmatch test1", "wasb://bucket1@user", "wasb://bucket2@user/"),
+    ("wasb URI unmatch test2", "wasb://bucket1@user", "wasb://bucket1@user2/"),
+    ("s3 URI unmatch test", "s3a://user@pass:bucket1/", 
"s3a://user2@pass2:bucket1/"),
+    ("hdfs URI unmatch test1", "hdfs://namenode1/path1", 
"hdfs://namenode1:8080/path2"),
+    ("hdfs URI unmatch test2", "hdfs://namenode1:8020/path1", 
"hdfs://namenode1:8080/path2")
+  )
+
+  unmatching.foreach { t =>
+      test(t._1) {
+        assert(!Client.compareUri(new URI(t._2), new URI(t._3)),
+          s"match between ${t._2} and ${t._3}")
+      }
+  }
+
   object Fixtures {
 
     val knownDefYarnAppCP: Seq[String] =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to