This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 321341a [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir 321341a is described below commit 321341a4c3104380035350631c82a4b385f117e4 Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Tue Mar 17 23:03:18 2020 +0800 [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir ### What changes were proposed in this pull request? In Spark CLI, we create a hive `CliSessionState` and it does not load the `hive-site.xml`. So the configurations in `hive-site.xml` will not take effects like other spark-hive integration apps. Also, the warehouse directory is not correctly picked. If the `default` database does not exist, the `CliSessionState` will create one during the first time it talks to the metastore. The `Location` of the default DB will be neither the value of `spark.sql.warehousr.dir` nor the user-specified value of `hive.metastore.warehourse.dir`, but the default value of `hive.metastore.warehourse.dir `which will always be `/user/hive/warehouse`. ### Why are the changes needed? fix bug for Spark SQL cli to pick right confs ### Does this PR introduce any user-facing change? yes, the non-exists default database will be created in the location specified by the users via `spark.sql.warehouse.dir` or `hive.metastore.warehouse.dir`, or the default value of `spark.sql.warehouse.dir` if none of them specified. ### How was this patch tested? add cli ut Closes #27933 from yaooqinn/SPARK-31170. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 5bc0d76591b46f0c1c9ec283ee8e1c5da76e67d6) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/internal/SharedState.scala | 80 +++++++++++----------- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 2 + .../spark/sql/hive/thriftserver/CliSuite.scala | 12 ++++ .../spark/sql/hive/HiveSharedStateSuite.scala | 1 - .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 5 files changed, 55 insertions(+), 42 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5347264..eb74e96 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -41,7 +41,6 @@ import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils - /** * A class that holds all state shared across sessions in a given [[SQLContext]]. * @@ -55,45 +54,10 @@ private[sql] class SharedState( SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf) - // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on - // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. - val warehousePath: String = { - val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") - if (configFile != null) { - logInfo(s"loading hive config file: $configFile") - sparkContext.hadoopConfiguration.addResource(configFile) - } - - // hive.metastore.warehouse.dir only stay in hadoopConf - sparkContext.conf.remove("hive.metastore.warehouse.dir") - // Set the Hive metastore warehouse path to the one we use - val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { - // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, - // we will respect the value of hive.metastore.warehouse.dir. - sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) - logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + - s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + - s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") - hiveWarehouseDir - } else { - // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using - // the value of spark.sql.warehouse.dir. - // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, - // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) - logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " + - s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") - sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir) - sparkWarehouseDir - } - } - logInfo(s"Warehouse path is '$warehousePath'.") - - // These 2 variables should be initiated after `warehousePath`, because in the first place we need - // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into - // both spark conf and hadoop conf avoiding be affected by any SparkSession level options private val (conf, hadoopConf) = { + // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into + // both spark conf and hadoop conf avoiding be affected by any SparkSession level options + SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration) val confClone = sparkContext.conf.clone() val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing @@ -166,7 +130,7 @@ private[sql] class SharedState( val defaultDbDefinition = CatalogDatabase( SessionCatalog.DEFAULT_DATABASE, "default database", - CatalogUtils.stringToURI(warehousePath), + CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), Map()) // Create default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { @@ -258,4 +222,40 @@ object SharedState extends Logging { throw new IllegalArgumentException(s"Error while instantiating '$className':", e) } } + + /** + * Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on + * the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. + */ + def loadHiveConfFile(sparkConf: SparkConf, hadoopConf: Configuration): Unit = { + val hiveWarehouseKey = "hive.metastore.warehouse.dir" + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + logInfo(s"loading hive config file: $configFile") + hadoopConf.addResource(configFile) + } + // hive.metastore.warehouse.dir only stay in hadoopConf + sparkConf.remove(hiveWarehouseKey) + // Set the Hive metastore warehouse path to the one we use + val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey) + val warehousePath = if (hiveWarehouseDir != null && !sparkConf.contains(WAREHOUSE_PATH.key)) { + // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, + // we will respect the value of hive.metastore.warehouse.dir. + sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) + logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" + + s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey ('$hiveWarehouseDir').") + hiveWarehouseDir + } else { + // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using + // the value of spark.sql.warehouse.dir. + // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set + // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. + val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH) + logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " + + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") + hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir) + sparkWarehouseDir + } + logInfo(s"Warehouse path is '$warehousePath'.") + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 6b76927..3ddf4ec 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider +import org.apache.spark.sql.internal.SharedState import org.apache.spark.util.ShutdownHookManager /** @@ -87,6 +88,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { val sparkConf = new SparkConf(loadDefaults = true) val hadoopConf = SparkHadoopUtil.get.newConfiguration(sparkConf) + SharedState.loadHiveConfFile(sparkConf, hadoopConf) val extraConfigs = HiveUtils.formatTimeVarsForHiveClient(hadoopConf) val cliConf = HiveClientImpl.newHiveConf(sparkConf, hadoopConf, extraConfigs) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 43aafc3..ed77663 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -32,6 +32,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.test.HiveTestJars +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} @@ -159,6 +160,17 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { } } + test("Pick spark.sql.warehouse.dir first for Spark Cli if set") { + val sparkWareHouseDir = Utils.createTempDir() + new File(warehousePath, "metastore_db").delete() + runCliWithin( + 1.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=$sparkWareHouseDir"))( + "desc database default;" -> sparkWareHouseDir.getAbsolutePath + ) + sparkWareHouseDir.delete() + } + test("Simple commands") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 6e2dcfc..78535b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -45,7 +45,6 @@ class HiveSharedStateSuite extends SparkFunSuite { GLOBAL_TEMP_DATABASE.key -> tmpDb) val state = new SharedState(sc, initialConfigs) - assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options") assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, "warehouse conf in session options can't affect application wide spark conf") assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 31ff62e..8b97489 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -787,7 +787,7 @@ object SPARK_18360 { .enableHiveSupport().getOrCreate() val defaultDbLocation = spark.catalog.getDatabase("default").locationUri - assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath)) + assert(new Path(defaultDbLocation) == new Path(spark.conf.get(WAREHOUSE_PATH))) val hiveClient = spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org