This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 576c4046742 [SPARK-35242][SQL] Support changing session catalog's default database 576c4046742 is described below commit 576c4046742e1fd5a0e93b87e9b29bac0df83788 Author: Gabor Roczei <roc...@gmail.com> AuthorDate: Tue Sep 27 08:28:26 2022 +0800 [SPARK-35242][SQL] Support changing session catalog's default database ### What changes were proposed in this pull request? This PR is a follow-up PR for #32364. It has been closed by github-actions because it hasn't been updated in a while. The previous PR has added a new custom parameter (spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase) to configure the session catalog's default database which is required by some use cases where the user does not have access to the default database. Therefore I have created a new PR based on this and added these changes in addition: - Rebased / updated the previous PR to the latest master branch version - Deleted the DEFAULT_DATABASE static member from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala and refactored the code regarding this ### Why are the changes needed? If our user does not have any permissions for the Hive default database in Ranger, it will fail with the following error: ``` 22/08/26 18:36:21 INFO metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0 org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [hrt_10] does not have [USE] privilege on [default]) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144) ``` The idea is that we introduce a new configuration parameter where we can set a different database name for the default database. Our user has enough permissions for this in Ranger. For example: ```spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other_db``` ### Does this PR introduce _any_ user-facing change? There will be a new configuration parameter as I mentioned above but the default value is "default" as it was previously. ### How was this patch tested? 1) With github action (all tests passed) https://github.com/roczei/spark/actions/runs/2934863118 2) Manually tested with Ranger + Hive Scenario a) hrt_10 does not have access to the default database in Hive: ``` [hrt_10quasar-thbnqr-2 ~]$ spark-shell Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/08/26 18:14:18 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist 22/08/26 18:14:30 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: [dispatcher-event-loop-17]: Attempted to request executors before the AM has registered! ... scala> spark.sql("use other") 22/08/26 18:18:47 INFO conf.HiveConf: [main]: Found configuration file file:/etc/hive/conf/hive-site.xml 22/08/26 18:18:48 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist 22/08/26 18:18:48 WARN client.HiveClientImpl: [main]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d 22/08/26 18:18:48 INFO SessionState: [main]: Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d 22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: HMS client filtering is enabled. 22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Trying to connect to metastore with URI thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083 22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection. 22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Opened a connection to metastore, current connections: 1 22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Connected to metastore. 22/08/26 18:18:50 INFO metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0 org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [hrt_10] does not have [USE] privilege on [default]) at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110) at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223) at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150) at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144) at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:179) ``` This is the expected behavior because it will use the "default" db name. Scenario b) Use the "other" database where the hrt_10 user has proper permissions ``` [hrt_10quasar-thbnqr-2 ~]$ spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 22/08/26 18:27:03 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist 22/08/26 18:27:14 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: [dispatcher-event-loop-15]: Attempted to request executors before the AM has registered! ... scala> spark.sql("use other") 22/08/26 18:29:22 INFO conf.HiveConf: [main]: Found configuration file file:/etc/hive/conf/hive-site.xml 22/08/26 18:29:22 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist 22/08/26 18:29:22 WARN client.HiveClientImpl: [main]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2 22/08/26 18:29:22 INFO SessionState: [main]: Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2 22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: HMS client filtering is enabled. 22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Trying to connect to metastore with URI thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083 22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection. 22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Opened a connection to metastore, current connections: 1 22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Connected to metastore. 22/08/26 18:29:24 INFO metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0 res0: org.apache.spark.sql.DataFrame = [] scala> spark.sql("select * from employee").show() +---+----+------+-----------+ |eid|name|salary|destination| +---+----+------+-----------+ | 12| Ram| 10| Szeged| | 13| Joe| 20| Debrecen| +---+----+------+-----------+ scala> ``` Closes #37679 from roczei/SPARK-35242. Lead-authored-by: Gabor Roczei <roc...@gmail.com> Co-authored-by: hongdongdong <hongdongd...@cmss.chinamobile.com> Co-authored-by: hongdd <jn_...@163.com> Co-authored-by: Gabor Roczei <roc...@cloudera.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- core/src/main/resources/error/error-classes.json | 5 +++++ .../spark/sql/catalyst/catalog/SessionCatalog.scala | 8 +++++--- .../spark/sql/connector/catalog/CatalogManager.scala | 4 ++-- .../apache/spark/sql/errors/QueryExecutionErrors.scala | 8 ++++++++ .../scala/org/apache/spark/sql/internal/SQLConf.scala | 2 ++ .../org/apache/spark/sql/internal/StaticSQLConf.scala | 8 ++++++++ .../execution/datasources/v2/V2SessionCatalog.scala | 3 ++- .../org/apache/spark/sql/internal/SharedState.scala | 18 ++++++++++++------ .../apache/spark/sql/hive/thriftserver/CliSuite.scala | 15 +++++++++++++++ 9 files changed, 59 insertions(+), 12 deletions(-) diff --git a/core/src/main/resources/error/error-classes.json b/core/src/main/resources/error/error-classes.json index a71b977fc51..9e909bcb6c9 100644 --- a/core/src/main/resources/error/error-classes.json +++ b/core/src/main/resources/error/error-classes.json @@ -206,6 +206,11 @@ ], "sqlState" : "22008" }, + "DEFAULT_DATABASE_NOT_EXISTS" : { + "message" : [ + "Default database <defaultDatabase> does not exist, please create it first or change default database to 'default'." + ] + }, "DIVIDE_BY_ZERO" : { "message" : [ "Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set <config> to \"false\" to bypass this error." diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 1ada2ffa4fc..cc2ba8ac7e4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -68,7 +68,8 @@ class SessionCatalog( functionResourceLoader: FunctionResourceLoader, functionExpressionBuilder: FunctionExpressionBuilder, cacheSize: Int = SQLConf.get.tableRelationCacheSize, - cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with Logging { + cacheTTL: Long = SQLConf.get.metadataCacheTTL, + defaultDatabase: String = SQLConf.get.defaultDatabase) extends SQLConfHelper with Logging { import SessionCatalog._ import CatalogTypes.TablePartitionSpec @@ -88,7 +89,8 @@ class SessionCatalog( DummyFunctionResourceLoader, DummyFunctionExpressionBuilder, conf.tableRelationCacheSize, - conf.metadataCacheTTL) + conf.metadataCacheTTL, + conf.defaultDatabase) } // For testing only. @@ -129,7 +131,7 @@ class SessionCatalog( // check whether the temporary view or function exists, then, if not, operate on // the corresponding item in the current database. @GuardedBy("this") - protected var currentDb: String = format(DEFAULT_DATABASE) + protected var currentDb: String = format(defaultDatabase) private val validNameFormat = "([\\w_]+)".r diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala index 62c0772f865..cf9dd7fdf47 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala @@ -130,7 +130,7 @@ class CatalogManager( _currentNamespace = None // Reset the current database of v1 `SessionCatalog` when switching current catalog, so that // when we switch back to session catalog, the current namespace definitely is ["default"]. - v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE) + v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } @@ -144,7 +144,7 @@ class CatalogManager( catalogs.clear() _currentNamespace = None _currentCatalogName = None - v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE) + v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 1a93014ecff..7c7561b3a71 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -1932,6 +1932,14 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase { new UnsupportedOperationException(s"$nodeName does not implement doExecuteBroadcast") } + def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = { + new SparkException( + errorClass = "DEFAULT_DATABASE_NOT_EXISTS", + messageParameters = Map("defaultDatabase" -> defaultDatabase), + cause = null + ) + } + def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: String): Throwable = { new SparkException( s""" diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index bc6700a3b56..626c30cb9c2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4705,6 +4705,8 @@ class SQLConf extends Serializable with Logging { def errorMessageFormat: ErrorMessageFormat.Value = ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT)) + def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE) + /** ********************** SQLConf functionality methods ************ */ /** Set Spark SQL configuration properties. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala index 3be02f69f23..aaeac8ce6fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.internal import java.util.Locale import java.util.concurrent.TimeUnit +import org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME import org.apache.spark.util.Utils @@ -37,6 +38,13 @@ object StaticSQLConf { .stringConf .createWithDefault(Utils.resolveURI("spark-warehouse").toString) + val CATALOG_DEFAULT_DATABASE = + buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase") + .doc("The default database for session catalog.") + .version("3.4.0") + .stringConf + .createWithDefault("default") + val CATALOG_IMPLEMENTATION = buildStaticConf("spark.sql.catalogImplementation") .internal() .version("2.0.0") diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index efbc9dd7558..b9afe71d243 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.connector.catalog.functions.UnboundFunction import org.apache.spark.sql.connector.expressions.Transform import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.connector.V1Function import org.apache.spark.sql.types.StructType import org.apache.spark.sql.util.CaseInsensitiveStringMap @@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog) extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper { import V2SessionCatalog._ - override val defaultNamespace: Array[String] = Array("default") + override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase) override def name: String = CatalogManager.SESSION_CATALOG_NAME diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index f6b748d2424..92c3ec888d6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -148,13 +148,19 @@ private[sql] class SharedState( val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, Configuration]( SharedState.externalCatalogClassName(conf), conf, hadoopConf) - val defaultDbDefinition = CatalogDatabase( - SessionCatalog.DEFAULT_DATABASE, - "default database", - CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), - Map()) // Create default database if it doesn't exist - if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { + // If database name not equals 'default', throw exception + if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) { + if (!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase)) { + throw QueryExecutionErrors.defaultDatabaseNotExistsError( + SQLConf.get.defaultDatabase + ) + } + val defaultDbDefinition = CatalogDatabase( + SQLConf.get.defaultDatabase, + "default database", + CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)), + Map()) // There may be another Spark application creating default database at the same time, here we // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception. externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true) diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala index aa7b8876486..fb5beb60c5c 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala @@ -772,4 +772,19 @@ class CliSuite extends SparkFunSuite { } } // scalastyle:on line.size.limit + + test("SPARK-35242: Support change catalog default database for spark") { + // Create db and table first + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))( + "create database spark_35242;" -> "", + "use spark_35242;" -> "", + "CREATE TABLE spark_test(key INT, val STRING);" -> "") + + // Set default db + runCliWithin(2.minute, + Seq("--conf", s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}", + "--conf", s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))( + "show tables;" -> "spark_test") + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org