This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 576c4046742 [SPARK-35242][SQL] Support changing session catalog's 
default database
576c4046742 is described below

commit 576c4046742e1fd5a0e93b87e9b29bac0df83788
Author: Gabor Roczei <roc...@gmail.com>
AuthorDate: Tue Sep 27 08:28:26 2022 +0800

    [SPARK-35242][SQL] Support changing session catalog's default database
    
    ### What changes were proposed in this pull request?
    
    This PR is a follow-up PR for #32364. It has been closed by github-actions 
because it hasn't been updated in a while. The previous PR has added a new 
custom parameter (spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase) to 
configure the session catalog's default database which is required by some use 
cases where the user does not have access to the default database.
    
    Therefore I have created a new PR based on this and added these changes in 
addition:
    
    - Rebased / updated the previous PR to the latest master branch version
    - Deleted the DEFAULT_DATABASE  static member from 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 and refactored the code regarding this
    
    ### Why are the changes needed?
    
    If our user does not have any permissions for the Hive default database in 
Ranger, it will fail with the following error:
    
    ```
    22/08/26 18:36:21 INFO  metastore.RetryingMetaStoreClient: [main]: 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 
ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0
    org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:Permission denied: user [hrt_10] does not have [USE] 
privilege on [default])
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
      at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
      at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
    ```
    The idea is that we introduce a new configuration parameter where we can 
set a different database name for the default database. Our user has enough 
permissions for this  in Ranger.
    
    For example:
    
    ```spark-shell --conf 
spark.sql.catalog.spark_catalog.defaultDatabase=other_db```
    
    ### Does this PR introduce _any_ user-facing change?
    
    There will be a new configuration parameter as I mentioned above but the 
default value is "default" as it was previously.
    
    ### How was this patch tested?
    
    1) With github action (all tests passed)
    
    https://github.com/roczei/spark/actions/runs/2934863118
    
    2) Manually tested with Ranger + Hive
    
    Scenario a) hrt_10 does not have access to the default database in Hive:
    
    ```
    [hrt_10quasar-thbnqr-2 ~]$ spark-shell
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
    22/08/26 18:14:18 WARN  conf.HiveConf: [main]: HiveConf of name 
hive.masking.algo does not exist
    22/08/26 18:14:30 WARN  cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
[dispatcher-event-loop-17]: Attempted to request executors before the AM has 
registered!
    
    ...
    
    scala> spark.sql("use other")
    22/08/26 18:18:47 INFO  conf.HiveConf: [main]: Found configuration file 
file:/etc/hive/conf/hive-site.xml
    22/08/26 18:18:48 WARN  conf.HiveConf: [main]: HiveConf of name 
hive.masking.algo does not exist
    22/08/26 18:18:48 WARN  client.HiveClientImpl: [main]: Detected HiveConf 
hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless 
hive logic
    Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d
    22/08/26 18:18:48 INFO  SessionState: [main]: Hive Session ID = 
2188764e-d0dc-41b3-b714-f89b03cb3d6d
    22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: HMS client 
filtering is enabled.
    22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: Trying to 
connect to metastore with URI 
thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
    22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: 
HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift 
connection.
    22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: Opened a 
connection to metastore, current connections: 1
    22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: Connected to 
metastore.
    22/08/26 18:18:50 INFO  metastore.RetryingMetaStoreClient: [main]: 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 
ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0
    org.apache.spark.sql.AnalysisException: 
org.apache.hadoop.hive.ql.metadata.HiveException: 
MetaException(message:Permission denied: user [hrt_10] does not have [USE] 
privilege on [default])
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
      at 
org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
      at 
org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
      at 
org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
      at 
org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:179)
    ```
    
    This is the expected behavior because it will use the "default" db name.
    
    Scenario b) Use the "other" database where the hrt_10 user has proper 
permissions
    
    ```
    [hrt_10quasar-thbnqr-2 ~]$ spark-shell --conf 
spark.sql.catalog.spark_catalog.defaultDatabase=other
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
    22/08/26 18:27:03 WARN  conf.HiveConf: [main]: HiveConf of name 
hive.masking.algo does not exist
    22/08/26 18:27:14 WARN  cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: 
[dispatcher-event-loop-15]: Attempted to request executors before the AM has 
registered!
    
    ...
    
    scala> spark.sql("use other")
    22/08/26 18:29:22 INFO  conf.HiveConf: [main]: Found configuration file 
file:/etc/hive/conf/hive-site.xml
    22/08/26 18:29:22 WARN  conf.HiveConf: [main]: HiveConf of name 
hive.masking.algo does not exist
    22/08/26 18:29:22 WARN  client.HiveClientImpl: [main]: Detected HiveConf 
hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless 
hive logic
    Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2
    22/08/26 18:29:22 INFO  SessionState: [main]: Hive Session ID = 
47721693-dbfe-4760-80f6-d4a76a3b37d2
    22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: HMS client 
filtering is enabled.
    22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: Trying to 
connect to metastore with URI 
thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
    22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: 
HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift 
connection.
    22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: Opened a 
connection to metastore, current connections: 1
    22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: Connected to 
metastore.
    22/08/26 18:29:24 INFO  metastore.RetryingMetaStoreClient: [main]: 
RetryingMetaStoreClient proxy=class 
org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 
ugi=hrt_10ROOT.HWX.SITE (auth:KERBEROS) retries=1 delay=1 lifetime=0
    res0: org.apache.spark.sql.DataFrame = []
    
    scala> spark.sql("select * from employee").show()
    +---+----+------+-----------+
    |eid|name|salary|destination|
    +---+----+------+-----------+
    | 12| Ram|    10|     Szeged|
    | 13| Joe|    20|   Debrecen|
    +---+----+------+-----------+
    
    scala>
    ```
    
    Closes #37679 from roczei/SPARK-35242.
    
    Lead-authored-by: Gabor Roczei <roc...@gmail.com>
    Co-authored-by: hongdongdong <hongdongd...@cmss.chinamobile.com>
    Co-authored-by: hongdd <jn_...@163.com>
    Co-authored-by: Gabor Roczei <roc...@cloudera.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 core/src/main/resources/error/error-classes.json       |  5 +++++
 .../spark/sql/catalyst/catalog/SessionCatalog.scala    |  8 +++++---
 .../spark/sql/connector/catalog/CatalogManager.scala   |  4 ++--
 .../apache/spark/sql/errors/QueryExecutionErrors.scala |  8 ++++++++
 .../scala/org/apache/spark/sql/internal/SQLConf.scala  |  2 ++
 .../org/apache/spark/sql/internal/StaticSQLConf.scala  |  8 ++++++++
 .../execution/datasources/v2/V2SessionCatalog.scala    |  3 ++-
 .../org/apache/spark/sql/internal/SharedState.scala    | 18 ++++++++++++------
 .../apache/spark/sql/hive/thriftserver/CliSuite.scala  | 15 +++++++++++++++
 9 files changed, 59 insertions(+), 12 deletions(-)

diff --git a/core/src/main/resources/error/error-classes.json 
b/core/src/main/resources/error/error-classes.json
index a71b977fc51..9e909bcb6c9 100644
--- a/core/src/main/resources/error/error-classes.json
+++ b/core/src/main/resources/error/error-classes.json
@@ -206,6 +206,11 @@
     ],
     "sqlState" : "22008"
   },
+  "DEFAULT_DATABASE_NOT_EXISTS" : {
+    "message" : [
+      "Default database <defaultDatabase> does not exist, please create it 
first or change default database to 'default'."
+    ]
+  },
   "DIVIDE_BY_ZERO" : {
     "message" : [
       "Division by zero. Use `try_divide` to tolerate divisor being 0 and 
return NULL instead. If necessary set <config> to \"false\" to bypass this 
error."
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 1ada2ffa4fc..cc2ba8ac7e4 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -68,7 +68,8 @@ class SessionCatalog(
     functionResourceLoader: FunctionResourceLoader,
     functionExpressionBuilder: FunctionExpressionBuilder,
     cacheSize: Int = SQLConf.get.tableRelationCacheSize,
-    cacheTTL: Long = SQLConf.get.metadataCacheTTL) extends SQLConfHelper with 
Logging {
+    cacheTTL: Long = SQLConf.get.metadataCacheTTL,
+    defaultDatabase: String = SQLConf.get.defaultDatabase) extends 
SQLConfHelper with Logging {
   import SessionCatalog._
   import CatalogTypes.TablePartitionSpec
 
@@ -88,7 +89,8 @@ class SessionCatalog(
       DummyFunctionResourceLoader,
       DummyFunctionExpressionBuilder,
       conf.tableRelationCacheSize,
-      conf.metadataCacheTTL)
+      conf.metadataCacheTTL,
+      conf.defaultDatabase)
   }
 
   // For testing only.
@@ -129,7 +131,7 @@ class SessionCatalog(
   // check whether the temporary view or function exists, then, if not, 
operate on
   // the corresponding item in the current database.
   @GuardedBy("this")
-  protected var currentDb: String = format(DEFAULT_DATABASE)
+  protected var currentDb: String = format(defaultDatabase)
 
   private val validNameFormat = "([\\w_]+)".r
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 62c0772f865..cf9dd7fdf47 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -130,7 +130,7 @@ class CatalogManager(
       _currentNamespace = None
       // Reset the current database of v1 `SessionCatalog` when switching 
current catalog, so that
       // when we switch back to session catalog, the current namespace 
definitely is ["default"].
-      v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
+      v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
     }
   }
 
@@ -144,7 +144,7 @@ class CatalogManager(
     catalogs.clear()
     _currentNamespace = None
     _currentCatalogName = None
-    v1SessionCatalog.setCurrentDatabase(SessionCatalog.DEFAULT_DATABASE)
+    v1SessionCatalog.setCurrentDatabase(conf.defaultDatabase)
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
index 1a93014ecff..7c7561b3a71 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala
@@ -1932,6 +1932,14 @@ private[sql] object QueryExecutionErrors extends 
QueryErrorsBase {
     new UnsupportedOperationException(s"$nodeName does not implement 
doExecuteBroadcast")
   }
 
+  def defaultDatabaseNotExistsError(defaultDatabase: String): Throwable = {
+    new SparkException(
+      errorClass = "DEFAULT_DATABASE_NOT_EXISTS",
+      messageParameters = Map("defaultDatabase" -> defaultDatabase),
+      cause = null
+    )
+  }
+
   def databaseNameConflictWithSystemPreservedDatabaseError(globalTempDB: 
String): Throwable = {
     new SparkException(
       s"""
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index bc6700a3b56..626c30cb9c2 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -4705,6 +4705,8 @@ class SQLConf extends Serializable with Logging {
   def errorMessageFormat: ErrorMessageFormat.Value =
     ErrorMessageFormat.withName(getConf(SQLConf.ERROR_MESSAGE_FORMAT))
 
+  def defaultDatabase: String = getConf(StaticSQLConf.CATALOG_DEFAULT_DATABASE)
+
   /** ********************** SQLConf functionality methods ************ */
 
   /** Set Spark SQL configuration properties. */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
index 3be02f69f23..aaeac8ce6fc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/StaticSQLConf.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.internal
 import java.util.Locale
 import java.util.concurrent.TimeUnit
 
+import 
org.apache.spark.sql.connector.catalog.CatalogManager.SESSION_CATALOG_NAME
 import org.apache.spark.util.Utils
 
 
@@ -37,6 +38,13 @@ object StaticSQLConf {
     .stringConf
     .createWithDefault(Utils.resolveURI("spark-warehouse").toString)
 
+  val CATALOG_DEFAULT_DATABASE =
+    buildStaticConf(s"spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase")
+    .doc("The default database for session catalog.")
+    .version("3.4.0")
+    .stringConf
+    .createWithDefault("default")
+
   val CATALOG_IMPLEMENTATION = 
buildStaticConf("spark.sql.catalogImplementation")
     .internal()
     .version("2.0.0")
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index efbc9dd7558..b9afe71d243 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -32,6 +32,7 @@ import 
org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.{QueryCompilationErrors, 
QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.connector.V1Function
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
   extends TableCatalog with FunctionCatalog with SupportsNamespaces with 
SQLConfHelper {
   import V2SessionCatalog._
 
-  override val defaultNamespace: Array[String] = Array("default")
+  override val defaultNamespace: Array[String] = 
Array(SQLConf.get.defaultDatabase)
 
   override def name: String = CatalogManager.SESSION_CATALOG_NAME
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index f6b748d2424..92c3ec888d6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -148,13 +148,19 @@ private[sql] class SharedState(
     val externalCatalog = SharedState.reflect[ExternalCatalog, SparkConf, 
Configuration](
       SharedState.externalCatalogClassName(conf), conf, hadoopConf)
 
-    val defaultDbDefinition = CatalogDatabase(
-      SessionCatalog.DEFAULT_DATABASE,
-      "default database",
-      CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
-      Map())
     // Create default database if it doesn't exist
-    if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
+    // If database name not equals 'default', throw exception
+    if (!externalCatalog.databaseExists(SQLConf.get.defaultDatabase)) {
+      if 
(!SessionCatalog.DEFAULT_DATABASE.equalsIgnoreCase(SQLConf.get.defaultDatabase))
 {
+        throw QueryExecutionErrors.defaultDatabaseNotExistsError(
+          SQLConf.get.defaultDatabase
+        )
+      }
+      val defaultDbDefinition = CatalogDatabase(
+        SQLConf.get.defaultDatabase,
+        "default database",
+        CatalogUtils.stringToURI(conf.get(WAREHOUSE_PATH)),
+        Map())
       // There may be another Spark application creating default database at 
the same time, here we
       // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` 
exception.
       externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = 
true)
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
index aa7b8876486..fb5beb60c5c 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/CliSuite.scala
@@ -772,4 +772,19 @@ class CliSuite extends SparkFunSuite {
     }
   }
   // scalastyle:on line.size.limit
+
+  test("SPARK-35242: Support change catalog default database for spark") {
+    // Create db and table first
+    runCliWithin(2.minute,
+      Seq("--conf", 
s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}"))(
+      "create database spark_35242;" -> "",
+      "use spark_35242;" -> "",
+      "CREATE TABLE spark_test(key INT, val STRING);" -> "")
+
+    // Set default db
+    runCliWithin(2.minute,
+      Seq("--conf", 
s"${StaticSQLConf.WAREHOUSE_PATH.key}=${sparkWareHouseDir}",
+          "--conf", 
s"${StaticSQLConf.CATALOG_DEFAULT_DATABASE.key}=spark_35242"))(
+      "show tables;" -> "spark_test")
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to