This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new cbb4e7da692 [SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace cbb4e7da692 is described below commit cbb4e7da6924f62a6b5272a5684faac9f132fcfd Author: Zach Schuermann <zachary....@gmail.com> AuthorDate: Sat Jul 2 08:26:40 2022 +0800 [SPARK-39646][SQL] Make setCurrentDatabase compatible with 3 layer namespace ### What changes were proposed in this pull request? Change `setCurrentDatabase` catalog API to support 3 layer namespace. We use `sparkSession.sessionState.catalogManager.currentNamespace` for the currentDatabase now. ### Why are the changes needed? `setCurrentDatabase` doesn't support 3 layer namespace. ### Does this PR introduce _any_ user-facing change? Yes. This PR introduces a backwards-compatible API change to support 3 layer namespace (e.g. catalog.database.table) for `setCurrentDatabase`. ### How was this patch tested? UT Closes #36969 from schuermannator/3l-setCurrentDatabse. Authored-by: Zach Schuermann <zachary....@gmail.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- R/pkg/tests/fulltests/test_sparkSQL.R | 4 +-- .../apache/spark/sql/internal/CatalogImpl.scala | 9 ++++--- .../apache/spark/sql/internal/CatalogSuite.scala | 29 ++++++++++++++++++++++ 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/R/pkg/tests/fulltests/test_sparkSQL.R b/R/pkg/tests/fulltests/test_sparkSQL.R index 2acb7a9ceba..0f984d0022a 100644 --- a/R/pkg/tests/fulltests/test_sparkSQL.R +++ b/R/pkg/tests/fulltests/test_sparkSQL.R @@ -4015,8 +4015,8 @@ test_that("catalog APIs, currentDatabase, setCurrentDatabase, listDatabases", { expect_equal(currentDatabase(), "default") expect_error(setCurrentDatabase("default"), NA) expect_error(setCurrentDatabase("zxwtyswklpf"), - paste0("Error in setCurrentDatabase : analysis error - Database ", - "'zxwtyswklpf' does not exist")) + paste0("Error in setCurrentDatabase : no such database - Database ", + "'zxwtyswklpf' not found")) dbs <- collect(listDatabases()) expect_equal(names(dbs), c("name", "catalog", "description", "locationUri")) expect_equal(which(dbs[, 1] == "default"), 1) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 742ca5ccb1e..c276fbb677c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -59,15 +59,18 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * Returns the current default database in this session. */ - override def currentDatabase: String = sessionCatalog.getCurrentDatabase + override def currentDatabase: String = + sparkSession.sessionState.catalogManager.currentNamespace.toSeq.quoted /** * Sets the current default database in this session. */ @throws[AnalysisException]("database does not exist") override def setCurrentDatabase(dbName: String): Unit = { - requireDatabaseExists(dbName) - sessionCatalog.setCurrentDatabase(dbName) + // we assume dbName will not include the catalog prefix. e.g. if you call + // setCurrentDatabase("catalog.db") it will search for a database catalog.db in the catalog. + val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) + sparkSession.sessionState.catalogManager.setCurrentNamespace(ident.toArray) } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 6e6138c91dd..7e4933b3407 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -866,4 +866,33 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf sql(s"CREATE NAMESPACE $qualified") assert(spark.catalog.getDatabase(qualified).name === db) } + + test("three layer namespace compatibility - set current database") { + spark.catalog.setCurrentCatalog("testcat") + // namespace with the same name as catalog + sql("CREATE NAMESPACE testcat.testcat.my_db") + spark.catalog.setCurrentDatabase("testcat.my_db") + assert(spark.catalog.currentDatabase == "testcat.my_db") + // sessionCatalog still reports 'default' as current database + assert(sessionCatalog.getCurrentDatabase == "default") + val e = intercept[AnalysisException] { + spark.catalog.setCurrentDatabase("my_db") + }.getMessage + assert(e.contains("my_db")) + + // check that we can fall back to old sessionCatalog + createDatabase("hive_db") + val err = intercept[AnalysisException] { + spark.catalog.setCurrentDatabase("hive_db") + }.getMessage + assert(err.contains("hive_db")) + spark.catalog.setCurrentCatalog("spark_catalog") + spark.catalog.setCurrentDatabase("hive_db") + assert(spark.catalog.currentDatabase == "hive_db") + assert(sessionCatalog.getCurrentDatabase == "hive_db") + val e3 = intercept[AnalysisException] { + spark.catalog.setCurrentDatabase("unknown_db") + }.getMessage + assert(e3.contains("unknown_db")) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org