This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 126b74a792c [SPARK-46055][SQL] Rewrite Catalog Database APIs 
implementation
126b74a792c is described below

commit 126b74a792c3b316686c7677704d5aef85ce9cbb
Author: Yihong He <heyihong...@gmail.com>
AuthorDate: Wed Nov 29 01:33:11 2023 +0100

    [SPARK-46055][SQL] Rewrite Catalog Database APIs implementation
    
    ### What changes were proposed in this pull request?
    
    - Rewrite Catalog Database APIs implementation to use ResolvedNamespace to 
get metadata instead of directly calling catalog
    - Add a boolean flag `fetchMetadata` in `UnresolvedNamespace`
    - Add a metadata field in `ResolvedNamespace`
    
    ### Why are the changes needed?
    
    - Less duplicate logics. We should not directly invoke catalog APIs, but go 
through analyzer.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #43959 from heyihong/SPARK-46055.
    
    Authored-by: Yihong He <heyihong...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/ResolveCatalogs.scala    | 27 ++++++--
 .../sql/catalyst/analysis/v2ResolutionPlans.scala  |  9 ++-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  2 +-
 .../catalyst/analysis/ResolveSessionCatalog.scala  | 10 +--
 .../datasources/v2/DataSourceV2Strategy.scala      | 23 +++----
 .../apache/spark/sql/internal/CatalogImpl.scala    | 77 ++++++++--------------
 6 files changed, 75 insertions(+), 73 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
index c0a3e330ad2..1bddac7f8f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala
@@ -17,9 +17,11 @@
 
 package org.apache.spark.sql.catalyst.analysis
 
+import scala.jdk.CollectionConverters._
+
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, 
LookupCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
Identifier, LookupCatalog, SupportsNamespaces}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.util.ArrayImplicits._
 
@@ -50,10 +52,25 @@ class ResolveCatalogs(val catalogManager: CatalogManager)
 
     case CurrentNamespace =>
       ResolvedNamespace(currentCatalog, 
catalogManager.currentNamespace.toImmutableArraySeq)
-    case UnresolvedNamespace(Seq()) =>
-      ResolvedNamespace(currentCatalog, Seq.empty[String])
-    case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) =>
-      ResolvedNamespace(catalog, ns)
+    case UnresolvedNamespace(Seq(), fetchMetadata) =>
+      resolveNamespace(currentCatalog, Seq.empty[String], fetchMetadata)
+    case UnresolvedNamespace(CatalogAndNamespace(catalog, ns), fetchMetadata) 
=>
+      resolveNamespace(catalog, ns, fetchMetadata)
+  }
+
+  private def resolveNamespace(
+    catalog: CatalogPlugin,
+    ns: Seq[String],
+    fetchMetadata: Boolean): ResolvedNamespace = {
+    catalog match {
+      case supportsNS: SupportsNamespaces if fetchMetadata =>
+        ResolvedNamespace(
+          catalog,
+          ns,
+          supportsNS.loadNamespaceMetadata(ns.toArray).asScala.toMap)
+      case _ =>
+        ResolvedNamespace(catalog, ns)
+    }
   }
 
   private def resolveVariableName(nameParts: Seq[String]): ResolvedIdentifier 
= {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
index 287fdc3d656..1a7d2501d68 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala
@@ -37,7 +37,9 @@ import org.apache.spark.util.ArrayImplicits._
  * Holds the name of a namespace that has yet to be looked up in a catalog. It 
will be resolved to
  * [[ResolvedNamespace]] during analysis.
  */
-case class UnresolvedNamespace(multipartIdentifier: Seq[String]) extends 
UnresolvedLeafNode
+case class UnresolvedNamespace(
+    multipartIdentifier: Seq[String],
+    fetchMetadata: Boolean = false) extends UnresolvedLeafNode
 
 /**
  * A variant of [[UnresolvedNamespace]] that should be resolved to 
[[ResolvedNamespace]]
@@ -144,7 +146,10 @@ trait LeafNodeWithoutStats extends LeafNode {
 /**
  * A plan containing resolved namespace.
  */
-case class ResolvedNamespace(catalog: CatalogPlugin, namespace: Seq[String])
+case class ResolvedNamespace(
+    catalog: CatalogPlugin,
+    namespace: Seq[String],
+    metadata: Map[String, String] = Map.empty)
   extends LeafNodeWithoutStats {
   override def output: Seq[Attribute] = Nil
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index af1b2ed9b41..57fe6ae346f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -4125,7 +4125,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
       ShowTablePartition(table, 
UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(spec)))
     }.getOrElse {
       val ns = if (ctx.identifierReference() != null) {
-        withIdentClause(ctx.identifierReference, UnresolvedNamespace)
+        withIdentClause(ctx.identifierReference, UnresolvedNamespace(_))
       } else {
         CurrentNamespace
       }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
index 5fab89f4879..b4c549a9019 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala
@@ -625,10 +625,10 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
   private object DatabaseInSessionCatalog {
     def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
-      case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
-      case ResolvedNamespace(_, Seq()) =>
+      case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => 
None
+      case ResolvedNamespace(_, Seq(), _) =>
         throw 
QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError()
-      case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
+      case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName)
       case _ =>
         assert(resolved.namespace.length > 1)
         throw 
QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError(
@@ -638,8 +638,8 @@ class ResolveSessionCatalog(val catalogManager: 
CatalogManager)
 
   private object DatabaseNameInSessionCatalog {
     def unapply(resolved: ResolvedNamespace): Option[String] = resolved match {
-      case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None
-      case ResolvedNamespace(_, Seq(dbName)) => Some(dbName)
+      case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => 
None
+      case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName)
       case _ =>
         assert(resolved.namespace.length > 1)
         throw 
QueryCompilationErrors.requiresSinglePartNamespaceError(resolved.namespace)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 3f0dab11cda..ef82a81cb7e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -330,7 +330,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
     case WriteToContinuousDataSource(writer, query, customMetrics) =>
       WriteToContinuousDataSourceExec(writer, planLater(query), customMetrics) 
:: Nil
 
-    case DescribeNamespace(ResolvedNamespace(catalog, ns), extended, output) =>
+    case DescribeNamespace(ResolvedNamespace(catalog, ns, _), extended, 
output) =>
       DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) 
:: Nil
 
     case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) 
=>
@@ -369,10 +369,10 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
         invalidateTableCache(r),
         session.sharedState.cacheManager.cacheQuery) :: Nil
 
-    case SetNamespaceProperties(ResolvedNamespace(catalog, ns), properties) =>
+    case SetNamespaceProperties(ResolvedNamespace(catalog, ns, _), properties) 
=>
       AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, 
properties) :: Nil
 
-    case SetNamespaceLocation(ResolvedNamespace(catalog, ns), location) =>
+    case SetNamespaceLocation(ResolvedNamespace(catalog, ns, _), location) =>
       if (StringUtils.isEmpty(location)) {
         throw QueryExecutionErrors.invalidEmptyLocationError(location)
       }
@@ -381,13 +381,13 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
         ns,
         Map(SupportsNamespaces.PROP_LOCATION -> 
makeQualifiedDBObjectPath(location))) :: Nil
 
-    case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) =>
+    case CommentOnNamespace(ResolvedNamespace(catalog, ns, _), comment) =>
       AlterNamespaceSetPropertiesExec(
         catalog.asNamespaceCatalog,
         ns,
         Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil
 
-    case CreateNamespace(ResolvedNamespace(catalog, ns), ifNotExists, 
properties) =>
+    case CreateNamespace(ResolvedNamespace(catalog, ns, _), ifNotExists, 
properties) =>
       val location = properties.get(SupportsNamespaces.PROP_LOCATION)
       if (location.isDefined && location.get.isEmpty) {
         throw QueryExecutionErrors.invalidEmptyLocationError(location.get)
@@ -397,17 +397,17 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       }.getOrElse(properties)
       CreateNamespaceExec(catalog.asNamespaceCatalog, ns, ifNotExists, 
finalProperties) :: Nil
 
-    case DropNamespace(ResolvedNamespace(catalog, ns), ifExists, cascade) =>
+    case DropNamespace(ResolvedNamespace(catalog, ns, _), ifExists, cascade) =>
       DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil
 
-    case ShowNamespaces(ResolvedNamespace(catalog, ns), pattern, output) =>
+    case ShowNamespaces(ResolvedNamespace(catalog, ns, _), pattern, output) =>
       ShowNamespacesExec(output, catalog.asNamespaceCatalog, ns, pattern) :: 
Nil
 
-    case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) =>
+    case ShowTables(ResolvedNamespace(catalog, ns, _), pattern, output) =>
       ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil
 
     case ShowTablesExtended(
-        ResolvedNamespace(catalog, ns),
+        ResolvedNamespace(catalog, ns, _),
         pattern,
         output) =>
       ShowTablesExtendedExec(output, catalog.asTableCatalog, ns, pattern) :: 
Nil
@@ -416,7 +416,7 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
       ShowTablePartitionExec(output, r.catalog, r.identifier,
         r.table.asPartitionable, Seq(part).asResolvedPartitionSpecs.head) :: 
Nil
 
-    case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns)) =>
+    case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns, _)) =>
       val catalogManager = session.sessionState.catalogManager
       val namespace = if (ns.nonEmpty) Some(ns) else None
       SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), 
namespace) :: Nil
@@ -534,7 +534,8 @@ class DataSourceV2Strategy(session: SparkSession) extends 
Strategy with Predicat
           s"DropIndex is not supported in this table ${table.name}.")
       }
 
-    case ShowFunctions(ResolvedNamespace(catalog, ns), userScope, systemScope, 
pattern, output) =>
+    case ShowFunctions(
+      ResolvedNamespace(catalog, ns, _), userScope, systemScope, pattern, 
output) =>
       ShowFunctionsExec(
         output,
         catalog.asFunctionCatalog,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index cd9f8f6be3e..c37f31b9fa6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -29,7 +29,7 @@ import 
org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
 import org.apache.spark.sql.catalyst.expressions.{Expression, Literal}
 import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, 
LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, 
ShowNamespaces, ShowTables, UnresolvedTableSpec, View}
 import org.apache.spark.sql.catalyst.types.DataTypeUtils
-import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
SupportsNamespaces, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, 
SupportsNamespaces, TableCatalog}
 import 
org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, 
MultipartIdentifierHelper, NamespaceHelper, TransformHelper}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.command.ShowTablesCommand
@@ -81,17 +81,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   /**
    * Returns a list of databases available across all sessions.
    */
-  override def listDatabases(): Dataset[Database] = {
-    val plan = ShowNamespaces(UnresolvedNamespace(Nil), None)
-    val qe = sparkSession.sessionState.executePlan(plan)
-    val catalog = qe.analyzed.collectFirst {
-      case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog
-    }.get
-    val databases = qe.toRdd.collect().map { row =>
-      getNamespace(catalog, parseIdent(row.getString(0)))
-    }
-    CatalogImpl.makeDataset(databases.toImmutableArraySeq, sparkSession)
-  }
+  override def listDatabases(): Dataset[Database] = listDatabasesInternal(None)
 
   /**
    * Returns a list of databases (namespaces) which name match the specify 
pattern and
@@ -99,14 +89,17 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    *
    * @since 3.5.0
    */
-  override def listDatabases(pattern: String): Dataset[Database] = {
-    val plan = ShowNamespaces(UnresolvedNamespace(Nil), Some(pattern))
+  override def listDatabases(pattern: String): Dataset[Database] =
+    listDatabasesInternal(Some(pattern))
+
+  private def listDatabasesInternal(patternOpt: Option[String]): 
Dataset[Database] = {
+    val plan = ShowNamespaces(UnresolvedNamespace(Nil), patternOpt)
     val qe = sparkSession.sessionState.executePlan(plan)
     val catalog = qe.analyzed.collectFirst {
       case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog
     }.get
     val databases = qe.toRdd.collect().map { row =>
-      getNamespace(catalog, parseIdent(row.getString(0)))
+      makeDatabase(Some(catalog.name()), row.getString(0))
     }
     CatalogImpl.makeDataset(databases.toImmutableArraySeq, sparkSession)
   }
@@ -415,35 +408,28 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
     }
   }
 
-  private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database 
= catalog match {
-    case catalog: SupportsNamespaces =>
-      val metadata = catalog.loadNamespaceMetadata(ns.toArray)
-      new Database(
-        name = ns.quoted,
-        catalog = catalog.name,
-        description = metadata.get(SupportsNamespaces.PROP_COMMENT),
-        locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION))
-    // If the catalog doesn't support namespaces, we assume it's an implicit 
namespace, which always
-    // exists but has no metadata.
-    case catalog: CatalogPlugin =>
-      new Database(
-        name = ns.quoted,
-        catalog = catalog.name,
-        description = null,
-        locationUri = null)
-    case _ => new Database(name = ns.quoted, description = null, locationUri = 
null)
-  }
-
   /**
    * Gets the database with the specified name. This throws an 
`AnalysisException` when no
    * `Database` can be found.
    */
   override def getDatabase(dbName: String): Database = {
-    val namespace = resolveNamespace(dbName)
-    val plan = UnresolvedNamespace(namespace)
+    makeDatabase(None, dbName)
+  }
+
+  private def makeDatabase(catalogNameOpt: Option[String], dbName: String): 
Database = {
+    val idents = catalogNameOpt match {
+      case Some(catalogName) => catalogName +: parseIdent(dbName)
+      case None => resolveNamespace(dbName)
+    }
+    val plan = UnresolvedNamespace(idents, fetchMetadata = true)
     sparkSession.sessionState.executePlan(plan).analyzed match {
-      case ResolvedNamespace(catalog, namespace) =>
-        getNamespace(catalog, namespace)
+      case ResolvedNamespace(catalog, namespace, metadata) =>
+        new Database(
+          name = namespace.quoted,
+          catalog = catalog.name,
+          description = metadata.get(SupportsNamespaces.PROP_COMMENT).orNull,
+          locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION).orNull
+        )
       case _ => new Database(name = dbName, description = null, locationUri = 
null)
     }
   }
@@ -516,18 +502,11 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
    * Checks if the database with the specified name exists.
    */
   override def databaseExists(dbName: String): Boolean = {
-    // To maintain backwards compatibility, we first treat the input is a 
simple dbName and check
-    // if sessionCatalog contains it. If no, we try to parse it, resolve 
catalog and namespace,
-    // and check if namespace exists in the catalog.
-    if (!sessionCatalog.databaseExists(dbName)) {
-      val plan = UnresolvedNamespace(parseIdent(dbName))
-      sparkSession.sessionState.executePlan(plan).analyzed match {
-        case ResolvedNamespace(catalog: SupportsNamespaces, ns) =>
-          catalog.namespaceExists(ns.toArray)
-        case _ => true
-      }
-    } else {
+    try {
+      getDatabase(dbName)
       true
+    } catch {
+      case _: NoSuchNamespaceException => false
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to