This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6093a78  [SPARK-34558][SQL] warehouse path should be qualified ahead 
of populating and use
6093a78 is described below

commit 6093a78dbd310209f574567a50e5e216021e6ae8
Author: Kent Yao <y...@apache.org>
AuthorDate: Tue Mar 2 15:14:19 2021 +0000

    [SPARK-34558][SQL] warehouse path should be qualified ahead of populating 
and use
    
    ### What changes were proposed in this pull request?
    
    Currently, the warehouse path gets fully qualified in the caller side for 
creating a database, table, partition, etc. An unqualified path is populated 
into Spark and Hadoop confs, which leads to inconsistent API behaviors.  We 
should make it qualified ahead.
    
    When the value is a relative path `spark.sql.warehouse.dir=lakehouse`, some 
behaviors become inconsistent, for example.
    
    If the default database is absent at runtime, the app fails with
    
    ```java
    Caused by: java.lang.IllegalArgumentException: java.net.URISyntaxException: 
Relative path in absolute URI: file:./lakehouse
        at org.apache.hadoop.fs.Path.initialize(Path.java:263)
        at org.apache.hadoop.fs.Path.<init>(Path.java:254)
        at 
org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:133)
        at 
org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:137)
        at 
org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:150)
        at 
org.apache.hadoop.hive.metastore.Warehouse.getDefaultDatabasePath(Warehouse.java:163)
        at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB_core(HiveMetaStore.java:636)
        at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:655)
        at 
org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:431)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invokeInternal(RetryingHMSHandler.java:148)
        at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.invoke(RetryingHMSHandler.java:107)
        at 
org.apache.hadoop.hive.metastore.RetryingHMSHandler.<init>(RetryingHMSHandler.java:79)
        ... 73 more
    ```
    
    If the default database is present at runtime, the app can work with it, 
and if we create a database, it gets fully qualified, for example
    
    ```sql
    spark-sql> create database test;
    Time taken: 0.052 seconds
    spark-sql> desc database test;
    Database Name       test
    Comment
    Location    
file:/Users/kentyao/Downloads/spark/spark-3.2.0-SNAPSHOT-bin-20210226/lakehouse/test.db
    Owner       kentyao
    Time taken: 0.023 seconds, Fetched 4 row(s)
    ```
    
    Another thing is that the log becomes nubilous, for example.
    
    ```logtalk
    21/02/27 13:54:17 INFO SharedState: Setting hive.metastore.warehouse.dir 
('null') to the value of spark.sql.warehouse.dir ('datalake').
    21/02/27 13:54:17 INFO SharedState: Warehouse path is 'lakehouse'.
    ```
    
    ### Why are the changes needed?
    
    fix bug and ambiguity
    ### Does this PR introduce _any_ user-facing change?
    
    yes, the path now resolved with proper order - 
`warehouse->database->table->partition`
    
    ### How was this patch tested?
    
    w/ ut added
    
    Closes #31671 from yaooqinn/SPARK-34558.
    
    Authored-by: Kent Yao <y...@apache.org>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/internal/SharedState.scala    | 16 +++++----
 .../spark/sql/SparkSessionBuilderSuite.scala       | 41 ++++++++++++++++++----
 .../spark/sql/hive/HiveSharedStateSuite.scala      | 20 ++++++-----
 .../spark/sql/hive/HiveSparkSubmitSuite.scala      | 12 ++++---
 4 files changed, 62 insertions(+), 27 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index ac1dd4e..852a9b1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -26,7 +26,7 @@ import scala.reflect.ClassTag
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.fs.FsUrlStreamHandlerFactory
+import org.apache.hadoop.fs.{FsUrlStreamHandlerFactory, Path}
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException}
 import org.apache.spark.internal.Logging
@@ -253,9 +253,8 @@ object SharedState extends Logging {
     val warehousePath = if (hiveWarehouseDir != null && 
sparkWarehouseOption.isEmpty) {
       // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is 
not set,
       // we will respect the value of hive.metastore.warehouse.dir.
-      sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
       logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is 
set. Setting" +
-        s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey 
('$hiveWarehouseDir').")
+        s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey.")
       hiveWarehouseDir
     } else {
       // If spark.sql.warehouse.dir is set, we will override 
hive.metastore.warehouse.dir using
@@ -264,12 +263,15 @@ object SharedState extends Logging {
       // we will set hive.metastore.warehouse.dir to the default value of 
spark.sql.warehouse.dir.
       val sparkWarehouseDir = 
sparkWarehouseOption.getOrElse(WAREHOUSE_PATH.defaultValueString)
       logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value 
of " +
-        s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').")
-      sparkConf.set(WAREHOUSE_PATH.key, sparkWarehouseDir)
-      hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir)
+        s"${WAREHOUSE_PATH.key}.")
       sparkWarehouseDir
     }
-    logInfo(s"Warehouse path is '$warehousePath'.")
+
+    val tempPath = new Path(warehousePath)
+    val qualifiedWarehousePath = 
tempPath.getFileSystem(hadoopConf).makeQualified(tempPath).toString
+    sparkConf.set(WAREHOUSE_PATH.key, qualifiedWarehousePath)
+    hadoopConf.set(hiveWarehouseKey, qualifiedWarehousePath)
+    logInfo(s"Warehouse path is '$qualifiedWarehousePath'.")
     initialConfigs -- Seq(WAREHOUSE_PATH.key, hiveWarehouseKey)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
index 1f16bb6..e8e2f68 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/SparkSessionBuilderSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql
 
+import org.apache.hadoop.fs.Path
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.{SparkConf, SparkContext, SparkException, 
SparkFunSuite}
@@ -240,7 +241,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
       .getOrCreate()
     assert(session.conf.get("spark.app.name") === "test-app-SPARK-31532-2")
     assert(session.conf.get(GLOBAL_TEMP_DATABASE) === 
"globaltempdb-spark-31532-2")
-    assert(session.conf.get(WAREHOUSE_PATH) === "SPARK-31532-db-2")
+    assert(session.conf.get(WAREHOUSE_PATH) contains "SPARK-31532-db-2")
   }
 
   test("SPARK-32062: reset listenerRegistered in SparkSession") {
@@ -306,14 +307,14 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     // newly specified values
     val sharedWH = spark.sharedState.conf.get(wh)
     val sharedTD = spark.sharedState.conf.get(td)
-    assert(sharedWH === "./data2",
+    assert(sharedWH contains "data2",
       "The warehouse dir in shared state should be determined by the 1st 
created spark session")
     assert(sharedTD === "alice",
       "Static sql configs in shared state should be determined by the 1st 
created spark session")
     assert(spark.sharedState.conf.getOption(custom).isEmpty,
       "Dynamic sql configs is session specific")
 
-    assert(spark.conf.get(wh) === sharedWH,
+    assert(spark.conf.get(wh) contains sharedWH,
       "The warehouse dir in session conf and shared state conf should be 
consistent")
     assert(spark.conf.get(td) === sharedTD,
       "Static sql configs in session conf and shared state conf should be 
consistent")
@@ -321,7 +322,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
 
     spark.sql("RESET")
 
-    assert(spark.conf.get(wh) === sharedWH,
+    assert(spark.conf.get(wh) contains sharedWH,
       "The warehouse dir in shared state should be respect after RESET")
     assert(spark.conf.get(td) === sharedTD,
       "Static sql configs in shared state should be respect after RESET")
@@ -331,7 +332,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     val spark2 = SparkSession.builder()
       .config(wh, "./data3")
       .config(custom, "kyaoo").getOrCreate()
-    assert(spark2.conf.get(wh) === sharedWH)
+    assert(spark2.conf.get(wh) contains sharedWH)
     assert(spark2.conf.get(td) === sharedTD)
     assert(spark2.conf.get(custom) === "kyaoo")
   }
@@ -352,7 +353,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     spark.sql(s"SET $custom=c1")
     assert(spark.conf.get(custom) === "c1")
     spark.sql("RESET")
-    assert(spark.conf.get(wh) === "./data0",
+    assert(spark.conf.get(wh) contains "data0",
       "The warehouse dir in shared state should be respect after RESET")
     assert(spark.conf.get(td) === "bob",
       "Static sql configs in shared state should be respect after RESET")
@@ -381,7 +382,7 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     spark2.sql(s"SET $custom=c1")
     assert(spark2.conf.get(custom) === "c1")
     spark2.sql("RESET")
-    assert(spark2.conf.get(wh) === "./data1")
+    assert(spark2.conf.get(wh) contains "data1")
     assert(spark2.conf.get(td) === "alice")
     assert(spark2.conf.get(custom) === "c2")
 
@@ -412,4 +413,30 @@ class SparkSessionBuilderSuite extends SparkFunSuite with 
BeforeAndAfterEach {
     }
     
assert(!logAppender.loggingEvents.exists(_.getRenderedMessage.contains(msg)))
   }
+
+  Seq(".", "..", "dir0", "dir0/dir1", "/dir0/dir1", "./dir0").foreach { 
pathStr =>
+    test(s"SPARK-34558: warehouse path ($pathStr) should be qualified for 
spark/hadoop conf") {
+      val path = new Path(pathStr)
+      val conf = new SparkConf().set(WAREHOUSE_PATH, pathStr)
+      val session = SparkSession.builder()
+        .master("local")
+        .config(conf)
+        .getOrCreate()
+      val hadoopConf = session.sessionState.newHadoopConf()
+      val expected = 
path.getFileSystem(hadoopConf).makeQualified(path).toString
+      // session related configs
+      assert(hadoopConf.get("hive.metastore.warehouse.dir") === expected)
+      assert(session.conf.get(WAREHOUSE_PATH) === expected)
+      assert(session.sessionState.conf.warehousePath === expected)
+
+      // shared configs
+      assert(session.sharedState.conf.get(WAREHOUSE_PATH) === expected)
+      
assert(session.sharedState.hadoopConf.get("hive.metastore.warehouse.dir") === 
expected)
+
+      // spark context configs
+      assert(session.sparkContext.conf.get(WAREHOUSE_PATH) === expected)
+      
assert(session.sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
 ===
+        expected)
+    }
+  }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
index 4570e72..d23293b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.hive
 
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.conf.HiveConf.ConfVars
 
 import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
@@ -35,7 +37,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
   test("initial configs should be passed to SharedState but not SparkContext") 
{
     val conf = new SparkConf().setMaster("local").setAppName("SharedState 
Test")
     val sc = SparkContext.getOrCreate(conf)
-    val wareHouseDir = Utils.createTempDir().toString
+    val warehousePath = Utils.createTempDir().toString
     val invalidPath = "invalid/path"
     val metastorePath = Utils.createTempDir()
     val tmpDb = "tmp_db"
@@ -45,8 +47,8 @@ class HiveSharedStateSuite extends SparkFunSuite {
     // Especially, all these configs are passed to the cloned confs inside 
SharedState for sharing
     // cross sessions.
     val initialConfigs = Map("spark.foo" -> "bar",
-      WAREHOUSE_PATH.key -> wareHouseDir,
-      ConfVars.METASTOREWAREHOUSE.varname -> wareHouseDir,
+      WAREHOUSE_PATH.key -> warehousePath,
+      ConfVars.METASTOREWAREHOUSE.varname -> warehousePath,
       CATALOG_IMPLEMENTATION.key -> "hive",
       ConfVars.METASTORECONNECTURLKEY.varname ->
         s"jdbc:derby:;databaseName=$metastorePath/metastore_db;create=true",
@@ -56,9 +58,11 @@ class HiveSharedStateSuite extends SparkFunSuite {
     initialConfigs.foreach { case (k, v) => builder.config(k, v) }
     val ss = builder.getOrCreate()
     val state = ss.sharedState
-    assert(sc.conf.get(WAREHOUSE_PATH.key) === wareHouseDir,
+    val qualifiedWHPath =
+      FileUtils.makeQualified(new Path(warehousePath), 
sc.hadoopConfiguration).toString
+    assert(sc.conf.get(WAREHOUSE_PATH.key) === qualifiedWHPath,
       "initial warehouse conf in session options can affect application wide 
spark conf")
-    assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === 
wareHouseDir,
+    assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) === 
qualifiedWHPath,
       "initial warehouse conf in session options can affect application wide 
hadoop conf")
 
     assert(!state.sparkContext.conf.contains("spark.foo"),
@@ -68,7 +72,7 @@ class HiveSharedStateSuite extends SparkFunSuite {
     val client = 
state.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client
     assert(client.getConf("spark.foo", "") === "bar",
       "session level conf should be passed to catalog")
-    assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === 
wareHouseDir,
+    assert(client.getConf(ConfVars.METASTOREWAREHOUSE.varname, "") === 
qualifiedWHPath,
       "session level conf should be passed to catalog")
 
     assert(state.globalTempViewManager.database === tmpDb)
@@ -76,12 +80,12 @@ class HiveSharedStateSuite extends SparkFunSuite {
    val ss2 =
      builder.config("spark.foo", "bar2222").config(WAREHOUSE_PATH.key, 
invalidPath).getOrCreate()
 
-    assert(ss2.sparkContext.conf.get(WAREHOUSE_PATH.key) !== invalidPath,
+    
assert(!ss2.sparkContext.conf.get(WAREHOUSE_PATH.key).contains(invalidPath),
       "warehouse conf in session options can't affect application wide spark 
conf")
     
assert(ss2.sparkContext.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname)
 !==
       invalidPath, "warehouse conf in session options can't affect application 
wide hadoop conf")
     assert(ss.conf.get("spark.foo") === "bar2222", "session level conf should 
be passed to catalog")
-    assert(ss.conf.get(WAREHOUSE_PATH) !== invalidPath,
+    assert(!ss.conf.get(WAREHOUSE_PATH).contains(invalidPath),
       "session level conf should be passed to catalog")
   }
 }
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
index 4e64b4d..af8a23d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala
@@ -23,6 +23,7 @@ import scala.util.Properties
 
 import org.apache.commons.lang3.{JavaVersion, SystemUtils}
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.hive.common.FileUtils
 import org.scalatest.Assertions._
 import org.scalatest.BeforeAndAfterEach
 import org.scalatest.matchers.must.Matchers
@@ -408,10 +409,11 @@ object SetWarehouseLocationTest extends Logging {
 
     }
 
-    if (sparkSession.conf.get(WAREHOUSE_PATH.key) != 
expectedWarehouseLocation) {
+    val qualifiedWHPath = FileUtils.makeQualified(
+      new Path(expectedWarehouseLocation), 
sparkSession.sparkContext.hadoopConfiguration).toString
+    if (sparkSession.conf.get(WAREHOUSE_PATH.key) != qualifiedWHPath) {
       throw new Exception(
-        s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location 
" +
-        s"$expectedWarehouseLocation.")
+        s"${WAREHOUSE_PATH.key} is not set to the expected warehouse location 
$qualifiedWHPath.")
     }
 
     val catalog = sparkSession.sessionState.catalog
@@ -424,7 +426,7 @@ object SetWarehouseLocationTest extends Logging {
       val tableMetadata =
         catalog.getTableMetadata(TableIdentifier("testLocation", 
Some("default")))
       val expectedLocation =
-        
CatalogUtils.stringToURI(s"file:${expectedWarehouseLocation.toString}/testlocation")
+        CatalogUtils.stringToURI(s"$qualifiedWHPath/testlocation")
       val actualLocation = tableMetadata.location
       if (actualLocation != expectedLocation) {
         throw new Exception(
@@ -440,7 +442,7 @@ object SetWarehouseLocationTest extends Logging {
       val tableMetadata =
         catalog.getTableMetadata(TableIdentifier("testLocation", 
Some("testLocationDB")))
       val expectedLocation = CatalogUtils.stringToURI(
-        
s"file:${expectedWarehouseLocation.toString}/testlocationdb.db/testlocation")
+        s"$qualifiedWHPath/testlocationdb.db/testlocation")
       val actualLocation = tableMetadata.location
       if (actualLocation != expectedLocation) {
         throw new Exception(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to