This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new 5f5ee4d [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir 5f5ee4d is described below commit 5f5ee4d84acc933112c52b1818a865139c2af05a Author: Kent Yao <yaooq...@hotmail.com> AuthorDate: Fri Mar 27 12:05:45 2020 +0800 [SPARK-31170][SQL] Spark SQL Cli should respect hive-site.xml and spark.sql.warehouse.dir ### What changes were proposed in this pull request? In Spark CLI, we create a hive `CliSessionState` and it does not load the `hive-site.xml`. So the configurations in `hive-site.xml` will not take effects like other spark-hive integration apps. Also, the warehouse directory is not correctly picked. If the `default` database does not exist, the `CliSessionState` will create one during the first time it talks to the metastore. The `Location` of the default DB will be neither the value of `spark.sql.warehousr.dir` nor the user-specified value of `hive.metastore.warehourse.dir`, but the default value of `hive.metastore.warehourse.dir `which will always be `/user/hive/warehouse`. This PR fixes CLiSuite failure with the hive-1.2 profile in https://github.com/apache/spark/pull/27933. In https://github.com/apache/spark/pull/27933, we fix the issue in JIRA by deciding the warehouse dir using all properties from spark conf and Hadoop conf, but properties from `--hiveconf` is not included, they will be applied to the `CliSessionState` instance after it initialized. When this command-line option key is `hive.metastore.warehouse.dir`, the actual warehouse dir is overridden. Because of the logic in Hive for creating the non-existing default database changed, that test p [...] ` spark.hive.xxx > spark.hadoop.xxx > --hiveconf xxx > hive-site.xml` througth `ShareState.loadHiveConfFile` before sessionState start ### Why are the changes needed? Bugfix for Spark SQL CLI to pick right confs ### Does this PR introduce any user-facing change? yes, 1. the non-exists default database will be created in the location specified by the users via `spark.sql.warehouse.dir` or `hive.metastore.warehouse.dir`, or the default value of `spark.sql.warehouse.dir` if none of them specified. 2. configurations from `hive-site.xml` will not override command-line options or the properties defined with `spark.hadoo(hive).` prefix in spark conf. ### How was this patch tested? add cli ut Closes #27969 from yaooqinn/SPARK-31170-2. Authored-by: Kent Yao <yaooq...@hotmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> (cherry picked from commit 8be16907c261657f83f5d5934bcd978d8dacf7ff) Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../apache/spark/sql/internal/SharedState.scala | 87 +++++++++++--------- .../sql/hive/thriftserver/SparkSQLCLIDriver.scala | 9 +- .../src/test/noclasspath/hive-site.xml | 30 +++++++ .../spark/sql/hive/thriftserver/CliSuite.scala | 96 +++++++++++++++++----- .../spark/sql/hive/HiveSharedStateSuite.scala | 1 - .../spark/sql/hive/HiveSparkSubmitSuite.scala | 2 +- 6 files changed, 159 insertions(+), 66 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index 5347264..14b8ea6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -22,6 +22,7 @@ import java.util.UUID import java.util.concurrent.ConcurrentHashMap import javax.annotation.concurrent.GuardedBy +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal @@ -41,7 +42,6 @@ import org.apache.spark.sql.streaming.ui.{StreamingQueryStatusListener, Streamin import org.apache.spark.status.ElementTrackingStore import org.apache.spark.util.Utils - /** * A class that holds all state shared across sessions in a given [[SQLContext]]. * @@ -55,45 +55,10 @@ private[sql] class SharedState( SharedState.setFsUrlStreamHandlerFactory(sparkContext.conf) - // Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on - // the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. - val warehousePath: String = { - val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") - if (configFile != null) { - logInfo(s"loading hive config file: $configFile") - sparkContext.hadoopConfiguration.addResource(configFile) - } - - // hive.metastore.warehouse.dir only stay in hadoopConf - sparkContext.conf.remove("hive.metastore.warehouse.dir") - // Set the Hive metastore warehouse path to the one we use - val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") - if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { - // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, - // we will respect the value of hive.metastore.warehouse.dir. - sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) - logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + - s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + - s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") - hiveWarehouseDir - } else { - // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using - // the value of spark.sql.warehouse.dir. - // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set, - // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. - val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) - logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " + - s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") - sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir) - sparkWarehouseDir - } - } - logInfo(s"Warehouse path is '$warehousePath'.") - - // These 2 variables should be initiated after `warehousePath`, because in the first place we need - // to load hive-site.xml into hadoopConf and determine the warehouse path which will be set into - // both spark conf and hadoop conf avoiding be affected by any SparkSession level options private val (conf, hadoopConf) = { + // Load hive-site.xml into hadoopConf and determine the warehouse path which will be set into + // both spark conf and hadoop conf avoiding be affected by any SparkSession level options + SharedState.loadHiveConfFile(sparkContext.conf, sparkContext.hadoopConfiguration) val confClone = sparkContext.conf.clone() val hadoopConfClone = new Configuration(sparkContext.hadoopConfiguration) // If `SparkSession` is instantiated using an existing `SparkContext` instance and no existing @@ -166,7 +131,7 @@ private[sql] class SharedState( val defaultDbDefinition = CatalogDatabase( SessionCatalog.DEFAULT_DATABASE, "default database", - CatalogUtils.stringToURI(warehousePath), + CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), Map()) // Create default database if it doesn't exist if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { @@ -258,4 +223,46 @@ object SharedState extends Logging { throw new IllegalArgumentException(s"Error while instantiating '$className':", e) } } + + /** + * Load hive-site.xml into hadoopConf and determine the warehouse path we want to use, based on + * the config from both hive and Spark SQL. Finally set the warehouse config value to sparkConf. + */ + def loadHiveConfFile( + sparkConf: SparkConf, + hadoopConf: Configuration): Unit = { + val hiveWarehouseKey = "hive.metastore.warehouse.dir" + val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") + if (configFile != null) { + logInfo(s"loading hive config file: $configFile") + val hadoopConfTemp = new Configuration() + hadoopConfTemp.addResource(configFile) + hadoopConfTemp.asScala.foreach { entry => + hadoopConf.setIfUnset(entry.getKey, entry.getValue) + } + } + // hive.metastore.warehouse.dir only stay in hadoopConf + sparkConf.remove(hiveWarehouseKey) + // Set the Hive metastore warehouse path to the one we use + val hiveWarehouseDir = hadoopConf.get(hiveWarehouseKey) + val warehousePath = if (hiveWarehouseDir != null && !sparkConf.contains(WAREHOUSE_PATH.key)) { + // If hive.metastore.warehouse.dir is set and spark.sql.warehouse.dir is not set, + // we will respect the value of hive.metastore.warehouse.dir. + sparkConf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) + logInfo(s"${WAREHOUSE_PATH.key} is not set, but $hiveWarehouseKey is set. Setting" + + s" ${WAREHOUSE_PATH.key} to the value of $hiveWarehouseKey ('$hiveWarehouseDir').") + hiveWarehouseDir + } else { + // If spark.sql.warehouse.dir is set, we will override hive.metastore.warehouse.dir using + // the value of spark.sql.warehouse.dir. + // When neither spark.sql.warehouse.dir nor hive.metastore.warehouse.dir is set + // we will set hive.metastore.warehouse.dir to the default value of spark.sql.warehouse.dir. + val sparkWarehouseDir = sparkConf.get(WAREHOUSE_PATH) + logInfo(s"Setting $hiveWarehouseKey ('$hiveWarehouseDir') to the value of " + + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") + hadoopConf.set(hiveWarehouseKey, sparkWarehouseDir) + sparkWarehouseDir + } + logInfo(s"Warehouse path is '$warehousePath'.") + } } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 6b76927..5ed0cb0 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -46,6 +46,7 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.security.HiveDelegationTokenProvider +import org.apache.spark.sql.internal.SharedState import org.apache.spark.util.ShutdownHookManager /** @@ -130,6 +131,7 @@ private[hive] object SparkSQLCLIDriver extends Logging { UserGroupInformation.getCurrentUser.addCredentials(credentials) } + SharedState.loadHiveConfFile(sparkConf, conf) SessionState.start(sessionState) // Clean up after we exit @@ -188,8 +190,11 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Execute -i init files (always in silent mode) cli.processInitFiles(sessionState) - newHiveConf.foreach { kv => - SparkSQLEnv.sqlContext.setConf(kv._1, kv._2) + // We don't propagate hive.metastore.warehouse.dir, because it might has been adjusted in + // [[SharedState.loadHiveConfFile]] based on the user specified or default values of + // spark.sql.warehouse.dir and hive.metastore.warehouse.dir. + for ((k, v) <- newHiveConf if k != "hive.metastore.warehouse.dir") { + SparkSQLEnv.sqlContext.setConf(k, v) } if (sessionState.execString != null) { diff --git a/sql/hive-thriftserver/src/test/noclasspath/hive-site.xml b/sql/hive-thriftserver/src/test/noclasspath/hive-site.xml new file mode 100644 index 0000000..d0bf04d --- /dev/null +++ b/sql/hive-thriftserver/src/test/noclasspath/hive-site.xml @@ -0,0 +1,30 @@ +<?xml version="1.0"?> +<?xml-stylesheet type="text/xsl" href="configuration.xsl"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> + +<configuration> + <property> + <name>hive.in.test</name> + <value>true</value> + <description>Internal marker for test.</description> + </property> + <property> + <name>hive.metastore.warehouse.dir</name> + <value>/tmp/hive_one</value> + </property> +</configuration> diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index 43aafc3..c393054 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -27,22 +27,23 @@ import scala.concurrent.Promise import scala.concurrent.duration._ import org.apache.hadoop.hive.conf.HiveConf.ConfVars -import org.scalatest.BeforeAndAfterAll +import org.scalatest.{BeforeAndAfterAll, BeforeAndAfterEach} import org.apache.spark.SparkFunSuite import org.apache.spark.internal.Logging import org.apache.spark.sql.hive.test.HiveTestJars +import org.apache.spark.sql.internal.StaticSQLConf import org.apache.spark.sql.test.ProcessTestUtils.ProcessOutputCapturer import org.apache.spark.util.{ThreadUtils, Utils} /** - * A test suite for the `spark-sql` CLI tool. Note that all test cases share the same temporary - * Hive metastore and warehouse. + * A test suite for the `spark-sql` CLI tool. */ -class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { +class CliSuite extends SparkFunSuite with BeforeAndAfterAll with BeforeAndAfterEach with Logging { val warehousePath = Utils.createTempDir() val metastorePath = Utils.createTempDir() val scratchDirPath = Utils.createTempDir() + val sparkWareHouseDir = Utils.createTempDir() override def beforeAll(): Unit = { super.beforeAll() @@ -53,14 +54,20 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { override def afterAll(): Unit = { try { - warehousePath.delete() - metastorePath.delete() - scratchDirPath.delete() + Utils.deleteRecursively(warehousePath) + Utils.deleteRecursively(metastorePath) + Utils.deleteRecursively(scratchDirPath) } finally { super.afterAll() } } + override def afterEach(): Unit = { + // Only running `runCliWithin` in a single test case will share the same temporary + // Hive metastore + Utils.deleteRecursively(metastorePath) + } + /** * Run a CLI operation and expect all the queries and expected answers to be returned. * @@ -75,25 +82,35 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { def runCliWithin( timeout: FiniteDuration, extraArgs: Seq[String] = Seq.empty, - errorResponses: Seq[String] = Seq("Error:"))( + errorResponses: Seq[String] = Seq("Error:"), + maybeWarehouse: Option[File] = Some(warehousePath), + useExternalHiveFile: Boolean = false)( queriesAndExpectedAnswers: (String, String)*): Unit = { val (queries, expectedAnswers) = queriesAndExpectedAnswers.unzip // Explicitly adds ENTER for each statement to make sure they are actually entered into the CLI. val queriesString = queries.map(_ + "\n").mkString + val extraHive = if (useExternalHiveFile) { + s"--driver-class-path ${System.getProperty("user.dir")}/src/test/noclasspath" + } else { + "" + } + val warehouseConf = + maybeWarehouse.map(dir => s"--hiveconf ${ConfVars.METASTOREWAREHOUSE}=$dir").getOrElse("") val command = { val cliScript = "../../bin/spark-sql".split("/").mkString(File.separator) val jdbcUrl = s"jdbc:derby:;databaseName=$metastorePath;create=true" s"""$cliScript | --master local | --driver-java-options -Dderby.system.durability=test + | $extraHive | --conf spark.ui.enabled=false | --hiveconf ${ConfVars.METASTORECONNECTURLKEY}=$jdbcUrl - | --hiveconf ${ConfVars.METASTOREWAREHOUSE}=$warehousePath | --hiveconf ${ConfVars.SCRATCHDIR}=$scratchDirPath | --hiveconf conf1=conftest | --hiveconf conf2=1 + | $warehouseConf """.stripMargin.split("\\s+").toSeq ++ extraArgs } @@ -159,6 +176,54 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { } } + test("load warehouse dir from hive-site.xml") { + runCliWithin(1.minute, maybeWarehouse = None, useExternalHiveFile = true)( + "desc database default;" -> "hive_one", + "set spark.sql.warehouse.dir;" -> "hive_one") + } + + test("load warehouse dir from --hiveconf") { + // --hiveconf will overrides hive-site.xml + runCliWithin(2.minute, useExternalHiveFile = true)( + "desc database default;" -> warehousePath.getAbsolutePath, + "create database cliTestDb;" -> "", + "desc database cliTestDb;" -> warehousePath.getAbsolutePath, + "set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath) + } + + test("load warehouse dir from --conf spark(.hadoop).hive.*") { + // override conf from hive-site.xml + runCliWithin( + 2.minute, + extraArgs = Seq("--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$sparkWareHouseDir"), + maybeWarehouse = None, + useExternalHiveFile = true)( + "desc database default;" -> sparkWareHouseDir.getAbsolutePath, + "create database cliTestDb;" -> "", + "desc database cliTestDb;" -> sparkWareHouseDir.getAbsolutePath, + "set spark.sql.warehouse.dir;" -> sparkWareHouseDir.getAbsolutePath) + + // override conf from --hiveconf too + runCliWithin( + 2.minute, + extraArgs = Seq("--conf", s"spark.${ConfVars.METASTOREWAREHOUSE}=$sparkWareHouseDir"))( + "desc database default;" -> sparkWareHouseDir.getAbsolutePath, + "create database cliTestDb;" -> "", + "desc database cliTestDb;" -> sparkWareHouseDir.getAbsolutePath, + "set spark.sql.warehouse.dir;" -> sparkWareHouseDir.getAbsolutePath) + } + + test("load warehouse dir from spark.sql.warehouse.dir") { + // spark.sql.warehouse.dir overrides all hive ones + runCliWithin( + 2.minute, + extraArgs = + Seq("--conf", + s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}1", + "--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=${sparkWareHouseDir}2"))( + "desc database default;" -> sparkWareHouseDir.getAbsolutePath.concat("1")) + } + test("Simple commands") { val dataFilePath = Thread.currentThread().getContextClassLoader.getResource("data/files/small_kv.txt") @@ -308,19 +373,6 @@ class CliSuite extends SparkFunSuite with BeforeAndAfterAll with Logging { ) } - test("SPARK-21451: spark.sql.warehouse.dir should respect options in --hiveconf") { - runCliWithin(1.minute)("set spark.sql.warehouse.dir;" -> warehousePath.getAbsolutePath) - } - - test("SPARK-21451: Apply spark.hadoop.* configurations") { - val tmpDir = Utils.createTempDir(namePrefix = "SPARK-21451") - runCliWithin( - 1.minute, - Seq("--conf", s"spark.hadoop.${ConfVars.METASTOREWAREHOUSE}=$tmpDir"))( - "set spark.sql.warehouse.dir;" -> tmpDir.getAbsolutePath) - tmpDir.delete() - } - test("Support hive.aux.jars.path") { val hiveContribJar = HiveTestJars.getHiveContribJar().getCanonicalPath runCliWithin( diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala index 6e2dcfc..78535b0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSharedStateSuite.scala @@ -45,7 +45,6 @@ class HiveSharedStateSuite extends SparkFunSuite { GLOBAL_TEMP_DATABASE.key -> tmpDb) val state = new SharedState(sc, initialConfigs) - assert(state.warehousePath !== invalidPath, "warehouse path can't determine by session options") assert(sc.conf.get(WAREHOUSE_PATH.key) !== invalidPath, "warehouse conf in session options can't affect application wide spark conf") assert(sc.hadoopConfiguration.get(ConfVars.METASTOREWAREHOUSE.varname) !== invalidPath, diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala index 31ff62e..8b97489 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSparkSubmitSuite.scala @@ -787,7 +787,7 @@ object SPARK_18360 { .enableHiveSupport().getOrCreate() val defaultDbLocation = spark.catalog.getDatabase("default").locationUri - assert(new Path(defaultDbLocation) == new Path(spark.sharedState.warehousePath)) + assert(new Path(defaultDbLocation) == new Path(spark.conf.get(WAREHOUSE_PATH))) val hiveClient = spark.sharedState.externalCatalog.unwrapped.asInstanceOf[HiveExternalCatalog].client --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org