This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ca5f7e6c35d [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace ca5f7e6c35d is described below commit ca5f7e6c35d49e9599c39fcd0828b3e557848d11 Author: Rui Wang <rui.w...@databricks.com> AuthorDate: Tue Jun 21 11:18:36 2022 +0800 [SPARK-39263][SQL] Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace ### What changes were proposed in this pull request? Make GetTable, TableExists and DatabaseExists be compatible with 3 layer namespace ### Why are the changes needed? This is a part of effort to make catalog API be compatible with 3 layer namespace ### Does this PR introduce _any_ user-facing change? Yes. The API change here is backward compatible and it extends the API to further support 3 layer namespace (e.g. catalog.database.table). ### How was this patch tested? UT Closes #36641 from amaliujia/catalogapi2. Authored-by: Rui Wang <rui.w...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../apache/spark/sql/internal/CatalogImpl.scala | 55 ++++++++++++++++--- .../apache/spark/sql/internal/CatalogSuite.scala | 64 +++++++++++++++++++++- 3 files changed, 112 insertions(+), 9 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index 0152f49c798..54959b523c9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -966,7 +966,7 @@ class SessionCatalog( } def isGlobalTempViewDB(dbName: String): Boolean = { - globalTempViewManager.database.equals(dbName) + globalTempViewManager.database.equalsIgnoreCase(dbName) } def lookupTempView(name: TableIdentifier): Option[View] = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index 4b6ea33f3e6..f89a87c3011 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -23,12 +23,13 @@ import scala.util.control.NonFatal import org.apache.spark.sql._ import org.apache.spark.sql.catalog.{Catalog, Column, Database, Function, Table} import org.apache.spark.sql.catalyst.{DefinedByConstructorParams, FunctionIdentifier, TableIdentifier} -import org.apache.spark.sql.catalyst.analysis.{ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView} +import org.apache.spark.sql.catalyst.analysis.{ResolvedNamespace, ResolvedTable, ResolvedView, UnresolvedDBObjectName, UnresolvedNamespace, UnresolvedTable, UnresolvedTableOrView} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, RecoverPartitions, ShowTables, SubqueryAlias, TableSpec, View} import org.apache.spark.sql.catalyst.util.CharVarcharUtils -import org.apache.spark.sql.connector.catalog.{CatalogManager, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.types.StructType @@ -250,8 +251,26 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * table/view. This throws an `AnalysisException` when no `Table` can be found. */ override def getTable(tableName: String): Table = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - getTable(tableIdent.database.orNull, tableIdent.table) + // calling `sqlParser.parseTableIdentifier` to parse tableName. If it contains only table name + // and optionally contains a database name(thus a TableIdentifier), then we look up the table in + // sessionCatalog. Otherwise we try `sqlParser.parseMultipartIdentifier` to have a sequence of + // string as the qualified identifier and resolve the table through SQL analyzer. + try { + val ident = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + if (tableExists(ident.database.orNull, ident.table)) { + makeTable(ident) + } else { + getTable3LNamespace(tableName) + } + } catch { + case e: org.apache.spark.sql.catalyst.parser.ParseException => + getTable3LNamespace(tableName) + } + } + + private def getTable3LNamespace(tableName: String): Table = { + val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + makeTable(ident) } /** @@ -287,7 +306,21 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * Checks if the database with the specified name exists. */ override def databaseExists(dbName: String): Boolean = { - sessionCatalog.databaseExists(dbName) + // To maintain backwards compatibility, we first treat the input is a simple dbName and check + // if sessionCatalog contains it. If no, we try to parse it as 3 part name. If the parased + // identifier contains both catalog name and database name, we then search the database in the + // catalog. + if (!sessionCatalog.databaseExists(dbName)) { + val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(dbName) + val plan = sparkSession.sessionState.executePlan(UnresolvedNamespace(ident)).analyzed + plan match { + case ResolvedNamespace(catalog: SupportsNamespaces, _) => + catalog.namespaceExists(ident.slice(1, ident.size).toArray) + case _ => true + } + } else { + true + } } /** @@ -295,8 +328,16 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * view or a table/view. */ override def tableExists(tableName: String): Boolean = { - val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) - tableExists(tableIdent.database.orNull, tableIdent.table) + try { + val tableIdent = sparkSession.sessionState.sqlParser.parseTableIdentifier(tableName) + tableExists(tableIdent.database.orNull, tableIdent.table) + } catch { + case e: org.apache.spark.sql.catalyst.parser.ParseException => + val ident = sparkSession.sessionState.sqlParser.parseMultipartIdentifier(tableName) + val catalog = + sparkSession.sessionState.catalogManager.catalog(ident(0)).asTableCatalog + catalog.tableExists(Identifier.of(Array(ident(1)), ident(2))) + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala index 06db60676ef..4844884f693 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/CatalogSuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression import org.apache.spark.sql.catalyst.plans.logical.Range import org.apache.spark.sql.connector.FakeV2Provider -import org.apache.spark.sql.connector.catalog.{Identifier, InMemoryCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, InMemoryCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.CatalogHelper import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.StructType @@ -681,4 +681,66 @@ class CatalogSuite extends SharedSparkSession with AnalysisTest with BeforeAndAf assert(spark.catalog.listTables("default").collect().map(_.name).toSet == Set("my_table1", "my_table2", "my_temp_table")) } + + test("three layer namespace compatibility - get table") { + val catalogName = "testcat" + val dbName = "default" + val tableName = "my_table" + val tableSchema = new StructType().add("i", "int") + val description = "this is a test table" + + spark.catalog.createTable( + tableName = Array(catalogName, dbName, tableName).mkString("."), + source = classOf[FakeV2Provider].getName, + schema = tableSchema, + description = description, + options = Map.empty[String, String]) + + val t = spark.catalog.getTable(Array(catalogName, dbName, tableName).mkString(".")) + val expectedTable = + new Table( + tableName, + catalogName, + Array(dbName), + description, + CatalogTableType.MANAGED.name, + false) + assert(expectedTable.toString == t.toString) + + // test when both sessionCatalog and testcat contains tables with same name, and we expect + // the table in sessionCatalog is returned when use 2 part name. + createTable("my_table") + val t2 = spark.catalog.getTable(Array(dbName, tableName).mkString(".")) + assert(t2.catalog == CatalogManager.SESSION_CATALOG_NAME) + } + + test("three layer namespace compatibility - table exists") { + val catalogName = "testcat" + val dbName = "my_db" + val tableName = "my_table" + val tableSchema = new StructType().add("i", "int") + + assert(!spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString("."))) + + spark.catalog.createTable( + tableName = Array(catalogName, dbName, tableName).mkString("."), + source = classOf[FakeV2Provider].getName, + schema = tableSchema, + description = "", + options = Map.empty[String, String]) + + assert(spark.catalog.tableExists(Array(catalogName, dbName, tableName).mkString("."))) + } + + test("three layer namespace compatibility - database exists") { + val catalogName = "testcat" + val dbName = "my_db" + assert(!spark.catalog.databaseExists(Array(catalogName, dbName).mkString("."))) + + sql(s"CREATE NAMESPACE ${catalogName}.${dbName}") + assert(spark.catalog.databaseExists(Array(catalogName, dbName).mkString("."))) + + val catalogName2 = "catalog_not_exists" + assert(!spark.catalog.databaseExists(Array(catalogName2, dbName).mkString("."))) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org