Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e0c60f185 -> d0707c6ba


[SPARK-11227][CORE] UnknownHostException can be thrown when NameNode HA is 
enabled.

## What changes were proposed in this pull request?

If the following conditions are satisfied, executors don't load properties in 
`hdfs-site.xml` and UnknownHostException can be thrown.

(1) NameNode HA is enabled
(2) spark.eventLogging is disabled or logging path is NOT on HDFS
(3) Using Standalone or Mesos for the cluster manager
(4) There are no code to load `HdfsCondition` class in the driver regardless of 
directly or indirectly.
(5) The tasks access to HDFS

(There might be some more conditions...)

For example, following code causes UnknownHostException when the conditions 
above are satisfied.
```
sc.textFile("<path on HDFS>").collect

```

```
java.lang.IllegalArgumentException: java.net.UnknownHostException: hacluster
        at 
org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:378)
        at 
org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
        at 
org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:678)
        at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:619)
        at 
org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:149)
        at 
org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2653)
        at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:92)
        at 
org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2687)
        at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2669)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:371)
        at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:170)
        at 
org.apache.hadoop.mapred.JobConf.getWorkingDirectory(JobConf.java:656)
        at 
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:438)
        at 
org.apache.hadoop.mapred.FileInputFormat.setInputPaths(FileInputFormat.java:411)
        at 
org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986)
        at 
org.apache.spark.SparkContext$$anonfun$hadoopFile$1$$anonfun$32.apply(SparkContext.scala:986)
        at 
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177)
        at 
org.apache.spark.rdd.HadoopRDD$$anonfun$getJobConf$6.apply(HadoopRDD.scala:177)
        at scala.Option.map(Option.scala:146)
        at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:177)
        at org.apache.spark.rdd.HadoopRDD$$anon$1.<init>(HadoopRDD.scala:213)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:209)
        at org.apache.spark.rdd.HadoopRDD.compute(HadoopRDD.scala:102)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.UnknownHostException: hacluster
```

But following code doesn't cause the Exception because `textFile` method loads 
`HdfsConfiguration` indirectly.

```
sc.textFile("<path on HDFS>").collect
```

When a job includes some operations which access to HDFS, the object of 
`org.apache.hadoop.Configuration` is wrapped by `SerializableConfiguration`,  
serialized and broadcasted from driver to executors and each executor 
deserialize the object with `loadDefaults` false so HDFS related properties 
should be set before broadcasted.

## How was this patch tested?
Tested manually on my standalone cluster.

Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>

Closes #13738 from sarutak/SPARK-11227.

(cherry picked from commit 071eaaf9d2b63589f2e66e5279a16a5a484de6f5)
Signed-off-by: Tom Graves <tgra...@yahoo-inc.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0707c6b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0707c6b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0707c6b

Branch: refs/heads/branch-2.0
Commit: d0707c6baeb4003735a508f981111db370984354
Parents: e0c60f1
Author: Kousuke Saruta <saru...@oss.nttdata.co.jp>
Authored: Fri Aug 19 10:11:25 2016 -0500
Committer: Tom Graves <tgra...@yahoo-inc.com>
Committed: Fri Aug 19 10:11:41 2016 -0500

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 22 +++++++++++++++++++-
 1 file changed, 21 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d0707c6b/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 699dc51..37e0678 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -35,7 +35,7 @@ import scala.util.control.NonFatal
 import com.google.common.collect.MapMaker
 import org.apache.commons.lang3.SerializationUtils
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.io.{ArrayWritable, BooleanWritable, BytesWritable, 
DoubleWritable,
   FloatWritable, IntWritable, LongWritable, NullWritable, Text, Writable}
 import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf, 
SequenceFileInputFormat,
@@ -960,6 +960,11 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       valueClass: Class[V],
       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.getLocal(conf)
+
     // Add necessary security credentials to the JobConf before broadcasting 
it.
     SparkHadoopUtil.get.addCredentials(conf)
     new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, 
minPartitions)
@@ -980,6 +985,11 @@ class SparkContext(config: SparkConf) extends Logging with 
ExecutorAllocationCli
       valueClass: Class[V],
       minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.get(new URI(path), hadoopConfiguration)
+
     // A Hadoop configuration can be about 10 KB, which is pretty big, so 
broadcast it.
     val confBroadcast = broadcast(new 
SerializableConfiguration(hadoopConfiguration))
     val setInputPathsFunc = (jobConf: JobConf) => 
FileInputFormat.setInputPaths(jobConf, path)
@@ -1064,6 +1074,11 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       vClass: Class[V],
       conf: Configuration = hadoopConfiguration): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.get(new URI(path), hadoopConfiguration)
+
     // The call to NewHadoopJob automatically adds security credentials to 
conf,
     // so we don't need to explicitly add them ourselves
     val job = NewHadoopJob.getInstance(conf)
@@ -1098,6 +1113,11 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
       kClass: Class[K],
       vClass: Class[V]): RDD[(K, V)] = withScope {
     assertNotStopped()
+
+    // This is a hack to enforce loading hdfs-site.xml.
+    // See SPARK-11227 for details.
+    FileSystem.getLocal(conf)
+
     // Add necessary security credentials to the JobConf. Required to access 
secure HDFS.
     val jconf = new JobConf(conf)
     SparkHadoopUtil.get.addCredentials(jconf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to