[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
GitHub user mgaido91 opened a pull request: https://github.com/apache/spark/pull/21216 [SPARK-24149][YARN] Retrieve all federated namespaces tokens ## What changes were proposed in this pull request? Hadoop 3 introduces HDFS federation. This means that multiple namespaces are allowed on the same HDFS cluster. In Spark, we need to ask the delegation token for all the namenodes (for each namespace), otherwise accessing any other namespace different from the default one (for which we already fetch the delegation token) fails. The PR adds the automatic discovery of all the namenodes related to all the namespaces available according to the configs in hdfs-site.xml. ## How was this patch tested? manual tests in dockerized env You can merge this pull request into a Git repository by running: $ git pull https://github.com/mgaido91/spark SPARK-24149 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21216.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21216 commit dfdd957c15a43bb601b0ca287b7a84e6c326c4c0 Author: Marco Gaido Date: 2018-04-29T08:56:29Z [SPARK-24149][YARN] Retrieve all federated namespaces tokens --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r186015828 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -196,11 +196,17 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .toSet +// add the list of available namenodes for all namespaces in HDFS federation +val hadoopFilesystems = Option(hadoopConf.get("dfs.nameservices")) + .toSeq.flatMap(_.split(",")) + .map(ns => hadoopConf.get(s"dfs.namenode.rpc-address.$ns")) --- End diff -- Will this configuration "dfs.namenode.rpc-address.xxx" always be existed, shall we check if it is null or not? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r186024263 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -196,11 +196,17 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .toSet +// add the list of available namenodes for all namespaces in HDFS federation +val hadoopFilesystems = Option(hadoopConf.get("dfs.nameservices")) + .toSeq.flatMap(_.split(",")) + .map(ns => hadoopConf.get(s"dfs.namenode.rpc-address.$ns")) --- End diff -- if that namespace is listed in the `dfs.nameservices` config, this should exist, otherwise it is not a valid configuration. Shall we check for null in case we get an invalid config? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r186059087 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -196,11 +196,17 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .toSet +// add the list of available namenodes for all namespaces in HDFS federation +val hadoopFilesystems = Option(hadoopConf.get("dfs.nameservices")) + .toSeq.flatMap(_.split(",")) + .map(ns => hadoopConf.get(s"dfs.namenode.rpc-address.$ns")) --- End diff -- I think it is good to check the nullable, at least it is no harm to the current code. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r187134476 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,19 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// add the list of available namenodes for all namespaces in HDFS federation +// if ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens +// for its namespaces +val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + Set.empty +} else { + Option(hadoopConf.get("dfs.nameservices")) +.toSeq.flatMap(_.split(",")) +.flatMap(ns => Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns"))) --- End diff -- style is `.foo { bar => ... }` This also will not work for HA, since there's no direct "rpc-address" like this in that case, and you need to use the namespace URI. You should probably filter out the staging dir FS in that case, too, although maybe it's already taken care of (since `filesystemsToAccess` is a set). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r187134360 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,19 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// add the list of available namenodes for all namespaces in HDFS federation +// if ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens +// for its namespaces +val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + Set.empty +} else { + Option(hadoopConf.get("dfs.nameservices")) --- End diff -- `hadoopConf.getTrimmedStrings`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r187134147 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,19 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// add the list of available namenodes for all namespaces in HDFS federation --- End diff -- nit: needs some punctuation, sentences start with capital letters. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r187570029 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,19 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// add the list of available namenodes for all namespaces in HDFS federation +// if ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens +// for its namespaces +val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + Set.empty +} else { + Option(hadoopConf.get("dfs.nameservices")) +.toSeq.flatMap(_.split(",")) +.flatMap(ns => Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns"))) --- End diff -- Yes, you are right about HA, thanks. I am working on making it working also for HA. I will update asap. > You should probably filter out the staging dir FS in that case, too, although maybe it's already taken care of (since filesystemsToAccess is a set). yes, it is already taken care since it is a set. I have also this tested in the UT I added. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r188792203 --- Diff: docs/running-on-yarn.md --- @@ -426,8 +426,10 @@ To use a custom metrics.properties for the application master and executors, upd Standard Kerberos support in Spark is covered in the [Security](security.html#kerberos) page. In YARN mode, when accessing Hadoop file systems, aside from the service hosting the user's home --- End diff -- No your fault, but this doesn't seem accurate given the code, which doesn't seem to look at the home directory at all. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r189240461 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,31 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// Add the list of available namenodes for all namespaces in HDFS federation. +// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its +// namespaces. +val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + Set.empty +} else { + val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") + // Retrieving the filesystem for the nameservices where HA is not enabled + val filesystemsWithoutHA = nameservices.flatMap { ns => +hadoopConf.get(s"dfs.namenode.rpc-address.$ns") match { + case null => None + case nameNode => Some(new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)) --- End diff -- Maybe we can change to `Option(hadoopConf.get()).map {xxx}` for simplicity. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r189240503 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,31 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// Add the list of available namenodes for all namespaces in HDFS federation. +// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its +// namespaces. +val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + Set.empty +} else { + val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") + // Retrieving the filesystem for the nameservices where HA is not enabled + val filesystemsWithoutHA = nameservices.flatMap { ns => +hadoopConf.get(s"dfs.namenode.rpc-address.$ns") match { + case null => None + case nameNode => Some(new Path(s"hdfs://$nameNode").getFileSystem(hadoopConf)) +} + } + // Retrieving the filesystem for the nameservices where HA is enabled + val filesystemsWithHA = nameservices.flatMap { ns => +hadoopConf.get(s"dfs.ha.namenodes.$ns") match { + case null => None + case _ => Some(new Path(s"hdfs://$ns").getFileSystem(hadoopConf)) --- End diff -- Also here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/21216#discussion_r189255656 --- Diff: resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala --- @@ -200,7 +200,27 @@ object YarnSparkHadoopUtil { .map(new Path(_).getFileSystem(hadoopConf)) .getOrElse(FileSystem.get(hadoopConf)) -filesystemsToAccess + stagingFS +// Add the list of available namenodes for all namespaces in HDFS federation. +// If ViewFS is enabled, this is skipped as ViewFS already handles delegation tokens for its +// namespaces. +val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") { + Set.empty +} else { + val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices") + // Retrieving the filesystem for the nameservices where HA is not enabled + val filesystemsWithoutHA = nameservices.flatMap { ns => + Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map(nameNode => --- End diff -- I think you should use `{}` if this `map` separate into two lines: ``` Option(xxx).map { xx => foo } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21216 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org