This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new cbb4e7da692 [SPARK-39646][SQL] Make setCurrentDatabase compatible with 
3 layer namespace
cbb4e7da692 is described below

commit cbb4e7da6924f62a6b5272a5684faac9f132fcfd
Author: Zach Schuermann <zachary....@gmail.com>
AuthorDate: Sat Jul 2 08:26:40 2022 +0800

    [SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace
    
    ### What changes were proposed in this pull request?
    
    Change `setCurrentDatabase` catalog API to support 3 layer namespace. We 
use `sparkSession.sessionState.catalogManager.currentNamespace` for the 
currentDatabase now.
    
    ### Why are the changes needed?
    
    `setCurrentDatabase` doesn't support 3 layer namespace.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. This PR introduces a backwards-compatible API change to support 3 
layer namespace (e.g. catalog.database.table) for `setCurrentDatabase`.
    
    ### How was this patch tested?
    
    UT
    
    Closes #36969 from schuermannator/3l-setCurrentDatabse.
    
    Authored-by: Zach Schuermann <zachary....@gmail.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 R/pkg/tests/fulltests/test_sparkSQL.R              |  4 +--
 .../apache/spark/sql/internal/CatalogImpl.scala    |  9 ++++---
 .../apache/spark/sql/internal/CatalogSuite.scala   | 29 ++++++++++++++++++++++
 3 files changed, 37 insertions(+), 5 deletions(-)

diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R 
b/R/pkg/tests/fulltests/test_sparkSQL.R
index 2acb7a9ceba..0f984d0022a 100644
--- a/R/pkg/tests/fulltests/test_sparkSQL.R
+++ b/R/pkg/tests/fulltests/test_sparkSQL.R
@@ -4015,8 +4015,8 @@ test_that("catalog APIs, currentDatabase, 
setCurrentDatabase, listDatabases", {
   expect_equal(currentDatabase(), "default")
   expect_error(setCurrentDatabase("default"), NA)
   expect_error(setCurrentDatabase("zxwtyswklpf"),
-               paste0("Error in setCurrentDatabase : analysis error - Database 
",
-               "'zxwtyswklpf' does not exist"))
+               paste0("Error in setCurrentDatabase : no such database - 
Database ",
+               "'zxwtyswklpf' not found"))
   dbs <- collect(listDatabases())
   expect_equal(names(dbs), c("name", "catalog", "description", "locationUri"))
   expect_equal(which(dbs[, 1] == "default"), 1)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 742ca5ccb1e..c276fbb677c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -59,15 +59,18 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   /**
    * Returns the current default database in this session.
    */
-  override def currentDatabase: String = sessionCatalog.getCurrentDatabase
+  override def currentDatabase: String =
+    sparkSession.sessionState.catalogManager.currentNamespace.toSeq.quoted
 
   /**
    * Sets the current default database in this session.
    */
   @throws[AnalysisException]("database does not exist")
   override def setCurrentDatabase(dbName: String): Unit = {
-    requireDatabaseExists(dbName)
-    sessionCatalog.setCurrentDatabase(dbName)
+    // we assume dbName will not include the catalog prefix. e.g. if you call
+    // setCurrentDatabase("catalog.db") it will search for a database 
catalog.db in the catalog.
+    val ident = 
sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName)
+    sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray)
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
index 6e6138c91dd..7e4933b3407 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala
@@ -866,4 +866,33 @@ class CatalogSuite extends SharedSparkSession with 
AnalysisTest with BeforeAndAf
     sql(s"CREATE NAMESPACE $qualified")
     assert(spark.catalog.getDatabase(qualified).name === db)
   }
+
+  test("three layer namespace compatibility - set current database") {
+    spark.catalog.setCurrentCatalog("testcat")
+    // namespace with the same name as catalog
+    sql("CREATE NAMESPACE testcat.testcat.my_db")
+    spark.catalog.setCurrentDatabase("testcat.my_db")
+    assert(spark.catalog.currentDatabase == "testcat.my_db")
+    // sessionCatalog still reports 'default' as current database
+    assert(sessionCatalog.getCurrentDatabase == "default")
+    val e = intercept[AnalysisException] {
+      spark.catalog.setCurrentDatabase("my_db")
+    }.getMessage
+    assert(e.contains("my_db"))
+
+    // check that we can fall back to old sessionCatalog
+    createDatabase("hive_db")
+    val err = intercept[AnalysisException] {
+      spark.catalog.setCurrentDatabase("hive_db")
+    }.getMessage
+    assert(err.contains("hive_db"))
+    spark.catalog.setCurrentCatalog("spark_catalog")
+    spark.catalog.setCurrentDatabase("hive_db")
+    assert(spark.catalog.currentDatabase == "hive_db")
+    assert(sessionCatalog.getCurrentDatabase == "hive_db")
+    val e3 = intercept[AnalysisException] {
+      spark.catalog.setCurrentDatabase("unknown_db")
+    }.getMessage
+    assert(e3.contains("unknown_db"))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to