This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7260146  [SPARK-31692][SQL] Pass hadoop confs  specifed via Spark 
confs to URLStreamHandlerfactory
7260146 is described below

commit 72601460ada41761737f39d5dff8e69444fce2ba
Author: Karuppayya Rajendran <karuppayya1...@gmail.com>
AuthorDate: Wed May 13 23:18:38 2020 -0700

    [SPARK-31692][SQL] Pass hadoop confs  specifed via Spark confs to 
URLStreamHandlerfactory
    
    ### What changes were proposed in this pull request?
    Pass hadoop confs  specifed via Spark confs to URLStreamHandlerfactory
    
    ### Why are the changes needed?
    
    **BEFORE**
    ```
    ➜  spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf 
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
    
    scala> spark.sharedState
    res0: org.apache.spark.sql.internal.SharedState = 
org.apache.spark.sql.internal.SharedState5793cd84
    
    scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
    res1: java.io.InputStream = 
org.apache.hadoop.fs.ChecksumFileSystem$FSDataBoundedInputStream22846025
    
    scala> import org.apache.hadoop.fs._
    import org.apache.hadoop.fs._
    
    scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri, 
spark.sparkContext.hadoopConfiguration)
    res2: org.apache.hadoop.fs.FileSystem = 
org.apache.hadoop.fs.LocalFileSystem5a930c03
    ```
    
    **AFTER**
    ```
    ➜  spark git:(SPARK-31692) ✗ ./bin/spark-shell --conf 
spark.hadoop.fs.file.impl=org.apache.hadoop.fs.RawLocalFileSystem
    
    scala> spark.sharedState
    res0: org.apache.spark.sql.internal.SharedState = 
org.apache.spark.sql.internal.SharedState5c24a636
    
    scala> new java.net.URL("file:///tmp/1.txt").openConnection.getInputStream
    res1: java.io.InputStream = org.apache.hadoop.fs.FSDataInputStream2ba8f528
    
    scala> import org.apache.hadoop.fs._
    import org.apache.hadoop.fs._
    
    scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri, 
spark.sparkContext.hadoopConfiguration)
    res2: org.apache.hadoop.fs.FileSystem = LocalFS
    
    scala>  FileSystem.get(new Path("file:///tmp/1.txt").toUri, 
spark.sparkContext.hadoopConfiguration).getClass
    res3: Class[_ <: org.apache.hadoop.fs.FileSystem] = class 
org.apache.hadoop.fs.RawLocalFileSystem
    ```
    The type of FileSystem object created(you can check the last statement in 
the above snippets) in the above two cases are different, which should not have 
been the case
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Tested locally.
    Added Unit test
    
    Closes #28516 from karuppayya/SPARK-31692.
    
    Authored-by: Karuppayya Rajendran <karuppayya1...@gmail.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/sql/internal/SharedState.scala    |  6 +--
 .../spark/sql/internal/SharedStateSuite.scala      | 55 ++++++++++++++++++++++
 2 files changed, 58 insertions(+), 3 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index 47119ab..ce4385d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -53,7 +53,7 @@ private[sql] class SharedState(
     initialConfigs: scala.collection.Map[String, String])
   extends Logging {
 
-  SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf)
+  SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf, 
sparkContext.hadoopConfiguration)
 
   private val (conf, hadoopConf) = {
     // Load hive-site.xml into hadoopConf and determine the warehouse path 
which will be set into
@@ -174,13 +174,13 @@ private[sql] class SharedState(
 object SharedState extends Logging {
   @volatile private var fsUrlStreamHandlerFactoryInitialized = false
 
-  private def setFsUrlStreamHandlerFactory(conf: SparkConf): Unit = {
+  private def setFsUrlStreamHandlerFactory(conf: SparkConf, hadoopConf: 
Configuration): Unit = {
     if (!fsUrlStreamHandlerFactoryInitialized &&
         conf.get(DEFAULT_URL_STREAM_HANDLER_FACTORY_ENABLED)) {
       synchronized {
         if (!fsUrlStreamHandlerFactoryInitialized) {
           try {
-            URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())
+            URL.setURLStreamHandlerFactory(new 
FsUrlStreamHandlerFactory(hadoopConf))
             fsUrlStreamHandlerFactoryInitialized = true
           } catch {
             case NonFatal(_) =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
new file mode 100644
index 0000000..81bf153
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/SharedStateSuite.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.internal
+
+import java.net.URL
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
+
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.test.SharedSparkSession
+
+
+/**
+ * Tests for [[org.apache.spark.sql.internal.SharedState]].
+ */
+class SharedStateSuite extends SharedSparkSession {
+
+  override protected def sparkConf: SparkConf = {
+    super.sparkConf
+      .set("spark.hadoop.fs.defaultFS", "file:///")
+  }
+
+  test("SPARK-31692: Url handler factory should have the hadoop configs from 
Spark conf") {
+    // Accessing shared state to init the object since it is `lazy val`
+    spark.sharedState
+    val field = classOf[URL].getDeclaredField("factory")
+    field.setAccessible(true)
+    val value = field.get(null)
+    assert(value.isInstanceOf[FsUrlStreamHandlerFactory])
+    val streamFactory = value.asInstanceOf[FsUrlStreamHandlerFactory]
+
+    val confField = classOf[FsUrlStreamHandlerFactory].getDeclaredField("conf")
+    confField.setAccessible(true)
+    val conf = confField.get(streamFactory)
+
+    assert(conf.isInstanceOf[Configuration])
+    assert(conf.asInstanceOf[Configuration].get("fs.defaultFS") == "file:///")
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to