[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-02 Thread mgaido91
GitHub user mgaido91 opened a pull request:

https://github.com/apache/spark/pull/21216

[SPARK-24149][YARN] Retrieve all federated namespaces tokens

## What changes were proposed in this pull request?

Hadoop 3 introduces HDFS federation. This means that multiple namespaces 
are allowed on the same HDFS cluster. In Spark, we need to ask the delegation 
token for all the namenodes (for each namespace), otherwise accessing any other 
namespace different from the default one (for which we already fetch the 
delegation token) fails.

The PR adds the automatic discovery of all the namenodes related to all the 
namespaces available according to the configs in hdfs-site.xml.

## How was this patch tested?

manual tests in dockerized env


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/mgaido91/spark SPARK-24149

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21216.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21216


commit dfdd957c15a43bb601b0ca287b7a84e6c326c4c0
Author: Marco Gaido 
Date:   2018-04-29T08:56:29Z

[SPARK-24149][YARN] Retrieve all federated namespaces tokens




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-04 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r186015828
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -196,11 +196,17 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .toSet
 
+// add the list of available namenodes for all namespaces in HDFS 
federation
+val hadoopFilesystems = Option(hadoopConf.get("dfs.nameservices"))
+  .toSeq.flatMap(_.split(","))
+  .map(ns => hadoopConf.get(s"dfs.namenode.rpc-address.$ns"))
--- End diff --

Will this configuration "dfs.namenode.rpc-address.xxx" always be existed, 
shall we check if it is null or not?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r186024263
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -196,11 +196,17 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .toSet
 
+// add the list of available namenodes for all namespaces in HDFS 
federation
+val hadoopFilesystems = Option(hadoopConf.get("dfs.nameservices"))
+  .toSeq.flatMap(_.split(","))
+  .map(ns => hadoopConf.get(s"dfs.namenode.rpc-address.$ns"))
--- End diff --

if that namespace is listed in the `dfs.nameservices` config, this should 
exist, otherwise it is not a valid configuration. Shall we check for null in 
case we get an invalid config?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-04 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r186059087
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -196,11 +196,17 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .toSet
 
+// add the list of available namenodes for all namespaces in HDFS 
federation
+val hadoopFilesystems = Option(hadoopConf.get("dfs.nameservices"))
+  .toSeq.flatMap(_.split(","))
+  .map(ns => hadoopConf.get(s"dfs.namenode.rpc-address.$ns"))
--- End diff --

I think it is good to check the nullable, at least it is no harm to the 
current code.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r187134476
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,19 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// add the list of available namenodes for all namespaces in HDFS 
federation
+// if ViewFS is enabled, this is skipped as ViewFS already handles 
delegation tokens
+// for its namespaces
+val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+  Set.empty
+} else {
+  Option(hadoopConf.get("dfs.nameservices"))
+.toSeq.flatMap(_.split(","))
+.flatMap(ns => 
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")))
--- End diff --

style is `.foo { bar => ... }`

This also will not work for HA, since there's no direct "rpc-address" like 
this in that case, and you need to use the namespace URI.

You should probably filter out the staging dir FS in that case, too, 
although maybe it's already taken care of (since `filesystemsToAccess` is a 
set).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r187134360
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,19 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// add the list of available namenodes for all namespaces in HDFS 
federation
+// if ViewFS is enabled, this is skipped as ViewFS already handles 
delegation tokens
+// for its namespaces
+val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+  Set.empty
+} else {
+  Option(hadoopConf.get("dfs.nameservices"))
--- End diff --

`hadoopConf.getTrimmedStrings`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-09 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r187134147
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,19 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// add the list of available namenodes for all namespaces in HDFS 
federation
--- End diff --

nit: needs some punctuation, sentences start with capital letters.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-11 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r187570029
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,19 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// add the list of available namenodes for all namespaces in HDFS 
federation
+// if ViewFS is enabled, this is skipped as ViewFS already handles 
delegation tokens
+// for its namespaces
+val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+  Set.empty
+} else {
+  Option(hadoopConf.get("dfs.nameservices"))
+.toSeq.flatMap(_.split(","))
+.flatMap(ns => 
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")))
--- End diff --

Yes, you are right about HA, thanks. I am working on making it working also 
for HA. I will update asap.

> You should probably filter out the staging dir FS in that case, too, 
although maybe it's already taken care of (since filesystemsToAccess is a set).

yes, it is already taken care since it is a set. I have also this tested in 
the UT I added.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-16 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r188792203
  
--- Diff: docs/running-on-yarn.md ---
@@ -426,8 +426,10 @@ To use a custom metrics.properties for the application 
master and executors, upd
 Standard Kerberos support in Spark is covered in the 
[Security](security.html#kerberos) page.
 
 In YARN mode, when accessing Hadoop file systems, aside from the service 
hosting the user's home
--- End diff --

No your fault, but this doesn't seem accurate given the code, which doesn't 
seem to look at the home directory at all.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-18 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r189240461
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,31 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// Add the list of available namenodes for all namespaces in HDFS 
federation.
+// If ViewFS is enabled, this is skipped as ViewFS already handles 
delegation tokens for its
+// namespaces.
+val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+  Set.empty
+} else {
+  val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
+  // Retrieving the filesystem for the nameservices where HA is not 
enabled
+  val filesystemsWithoutHA = nameservices.flatMap { ns =>
+hadoopConf.get(s"dfs.namenode.rpc-address.$ns") match {
+  case null => None
+  case nameNode => Some(new 
Path(s"hdfs://$nameNode").getFileSystem(hadoopConf))
--- End diff --

Maybe we can change to `Option(hadoopConf.get()).map {xxx}` for 
simplicity.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-18 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r189240503
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,31 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// Add the list of available namenodes for all namespaces in HDFS 
federation.
+// If ViewFS is enabled, this is skipped as ViewFS already handles 
delegation tokens for its
+// namespaces.
+val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+  Set.empty
+} else {
+  val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
+  // Retrieving the filesystem for the nameservices where HA is not 
enabled
+  val filesystemsWithoutHA = nameservices.flatMap { ns =>
+hadoopConf.get(s"dfs.namenode.rpc-address.$ns") match {
+  case null => None
+  case nameNode => Some(new 
Path(s"hdfs://$nameNode").getFileSystem(hadoopConf))
+}
+  }
+  // Retrieving the filesystem for the nameservices where HA is enabled
+  val filesystemsWithHA = nameservices.flatMap { ns =>
+hadoopConf.get(s"dfs.ha.namenodes.$ns") match {
+  case null => None
+  case _ => Some(new Path(s"hdfs://$ns").getFileSystem(hadoopConf))
--- End diff --

Also here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-18 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/21216#discussion_r189255656
  
--- Diff: 
resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
 ---
@@ -200,7 +200,27 @@ object YarnSparkHadoopUtil {
   .map(new Path(_).getFileSystem(hadoopConf))
   .getOrElse(FileSystem.get(hadoopConf))
 
-filesystemsToAccess + stagingFS
+// Add the list of available namenodes for all namespaces in HDFS 
federation.
+// If ViewFS is enabled, this is skipped as ViewFS already handles 
delegation tokens for its
+// namespaces.
+val hadoopFilesystems = if (stagingFS.getScheme == "viewfs") {
+  Set.empty
+} else {
+  val nameservices = hadoopConf.getTrimmedStrings("dfs.nameservices")
+  // Retrieving the filesystem for the nameservices where HA is not 
enabled
+  val filesystemsWithoutHA = nameservices.flatMap { ns =>
+
Option(hadoopConf.get(s"dfs.namenode.rpc-address.$ns")).map(nameNode =>
--- End diff --

I think you should use `{}` if this `map` separate into two lines: 

```
Option(xxx).map { xx =>
  foo
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21216: [SPARK-24149][YARN] Retrieve all federated namesp...

2018-05-18 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21216


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org