This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 126b74a792c [SPARK-46055][SQL] Rewrite Catalog Database APIs implementation 126b74a792c is described below commit 126b74a792c3b316686c7677704d5aef85ce9cbb Author: Yihong He <heyihong...@gmail.com> AuthorDate: Wed Nov 29 01:33:11 2023 +0100 [SPARK-46055][SQL] Rewrite Catalog Database APIs implementation ### What changes were proposed in this pull request? - Rewrite Catalog Database APIs implementation to use ResolvedNamespace to get metadata instead of directly calling catalog - Add a boolean flag `fetchMetadata` in `UnresolvedNamespace` - Add a metadata field in `ResolvedNamespace` ### Why are the changes needed? - Less duplicate logics. We should not directly invoke catalog APIs, but go through analyzer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #43959 from heyihong/SPARK-46055. Authored-by: Yihong He <heyihong...@gmail.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/catalyst/analysis/ResolveCatalogs.scala | 27 ++++++-- .../sql/catalyst/analysis/v2ResolutionPlans.scala | 9 ++- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../catalyst/analysis/ResolveSessionCatalog.scala | 10 +-- .../datasources/v2/DataSourceV2Strategy.scala | 23 +++---- .../apache/spark/sql/internal/CatalogImpl.scala | 77 ++++++++-------------- 6 files changed, 75 insertions(+), 73 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala index c0a3e330ad2..1bddac7f8f1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveCatalogs.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.catalyst.analysis +import scala.jdk.CollectionConverters._ + import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules.Rule -import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, LookupCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, Identifier, LookupCatalog, SupportsNamespaces} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.util.ArrayImplicits._ @@ -50,10 +52,25 @@ class ResolveCatalogs(val catalogManager: CatalogManager) case CurrentNamespace => ResolvedNamespace(currentCatalog, catalogManager.currentNamespace.toImmutableArraySeq) - case UnresolvedNamespace(Seq()) => - ResolvedNamespace(currentCatalog, Seq.empty[String]) - case UnresolvedNamespace(CatalogAndNamespace(catalog, ns)) => - ResolvedNamespace(catalog, ns) + case UnresolvedNamespace(Seq(), fetchMetadata) => + resolveNamespace(currentCatalog, Seq.empty[String], fetchMetadata) + case UnresolvedNamespace(CatalogAndNamespace(catalog, ns), fetchMetadata) => + resolveNamespace(catalog, ns, fetchMetadata) + } + + private def resolveNamespace( + catalog: CatalogPlugin, + ns: Seq[String], + fetchMetadata: Boolean): ResolvedNamespace = { + catalog match { + case supportsNS: SupportsNamespaces if fetchMetadata => + ResolvedNamespace( + catalog, + ns, + supportsNS.loadNamespaceMetadata(ns.toArray).asScala.toMap) + case _ => + ResolvedNamespace(catalog, ns) + } } private def resolveVariableName(nameParts: Seq[String]): ResolvedIdentifier = { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala index 287fdc3d656..1a7d2501d68 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/v2ResolutionPlans.scala @@ -37,7 +37,9 @@ import org.apache.spark.util.ArrayImplicits._ * Holds the name of a namespace that has yet to be looked up in a catalog. It will be resolved to * [[ResolvedNamespace]] during analysis. */ -case class UnresolvedNamespace(multipartIdentifier: Seq[String]) extends UnresolvedLeafNode +case class UnresolvedNamespace( + multipartIdentifier: Seq[String], + fetchMetadata: Boolean = false) extends UnresolvedLeafNode /** * A variant of [[UnresolvedNamespace]] that should be resolved to [[ResolvedNamespace]] @@ -144,7 +146,10 @@ trait LeafNodeWithoutStats extends LeafNode { /** * A plan containing resolved namespace. */ -case class ResolvedNamespace(catalog: CatalogPlugin, namespace: Seq[String]) +case class ResolvedNamespace( + catalog: CatalogPlugin, + namespace: Seq[String], + metadata: Map[String, String] = Map.empty) extends LeafNodeWithoutStats { override def output: Seq[Attribute] = Nil } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index af1b2ed9b41..57fe6ae346f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -4125,7 +4125,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { ShowTablePartition(table, UnresolvedPartitionSpec(visitNonOptionalPartitionSpec(spec))) }.getOrElse { val ns = if (ctx.identifierReference() != null) { - withIdentClause(ctx.identifierReference, UnresolvedNamespace) + withIdentClause(ctx.identifierReference, UnresolvedNamespace(_)) } else { CurrentNamespace } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index 5fab89f4879..b4c549a9019 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -625,10 +625,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None - case ResolvedNamespace(_, Seq()) => + case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None + case ResolvedNamespace(_, Seq(), _) => throw QueryCompilationErrors.databaseFromV1SessionCatalogNotSpecifiedError() - case ResolvedNamespace(_, Seq(dbName)) => Some(dbName) + case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) case _ => assert(resolved.namespace.length > 1) throw QueryCompilationErrors.nestedDatabaseUnsupportedByV1SessionCatalogError( @@ -638,8 +638,8 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) private object DatabaseNameInSessionCatalog { def unapply(resolved: ResolvedNamespace): Option[String] = resolved match { - case ResolvedNamespace(catalog, _) if !isSessionCatalog(catalog) => None - case ResolvedNamespace(_, Seq(dbName)) => Some(dbName) + case ResolvedNamespace(catalog, _, _) if !isSessionCatalog(catalog) => None + case ResolvedNamespace(_, Seq(dbName), _) => Some(dbName) case _ => assert(resolved.namespace.length > 1) throw QueryCompilationErrors.requiresSinglePartNamespaceError(resolved.namespace) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala index 3f0dab11cda..ef82a81cb7e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala @@ -330,7 +330,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat case WriteToContinuousDataSource(writer, query, customMetrics) => WriteToContinuousDataSourceExec(writer, planLater(query), customMetrics) :: Nil - case DescribeNamespace(ResolvedNamespace(catalog, ns), extended, output) => + case DescribeNamespace(ResolvedNamespace(catalog, ns, _), extended, output) => DescribeNamespaceExec(output, catalog.asNamespaceCatalog, ns, extended) :: Nil case DescribeRelation(r: ResolvedTable, partitionSpec, isExtended, output) => @@ -369,10 +369,10 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat invalidateTableCache(r), session.sharedState.cacheManager.cacheQuery) :: Nil - case SetNamespaceProperties(ResolvedNamespace(catalog, ns), properties) => + case SetNamespaceProperties(ResolvedNamespace(catalog, ns, _), properties) => AlterNamespaceSetPropertiesExec(catalog.asNamespaceCatalog, ns, properties) :: Nil - case SetNamespaceLocation(ResolvedNamespace(catalog, ns), location) => + case SetNamespaceLocation(ResolvedNamespace(catalog, ns, _), location) => if (StringUtils.isEmpty(location)) { throw QueryExecutionErrors.invalidEmptyLocationError(location) } @@ -381,13 +381,13 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat ns, Map(SupportsNamespaces.PROP_LOCATION -> makeQualifiedDBObjectPath(location))) :: Nil - case CommentOnNamespace(ResolvedNamespace(catalog, ns), comment) => + case CommentOnNamespace(ResolvedNamespace(catalog, ns, _), comment) => AlterNamespaceSetPropertiesExec( catalog.asNamespaceCatalog, ns, Map(SupportsNamespaces.PROP_COMMENT -> comment)) :: Nil - case CreateNamespace(ResolvedNamespace(catalog, ns), ifNotExists, properties) => + case CreateNamespace(ResolvedNamespace(catalog, ns, _), ifNotExists, properties) => val location = properties.get(SupportsNamespaces.PROP_LOCATION) if (location.isDefined && location.get.isEmpty) { throw QueryExecutionErrors.invalidEmptyLocationError(location.get) @@ -397,17 +397,17 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat }.getOrElse(properties) CreateNamespaceExec(catalog.asNamespaceCatalog, ns, ifNotExists, finalProperties) :: Nil - case DropNamespace(ResolvedNamespace(catalog, ns), ifExists, cascade) => + case DropNamespace(ResolvedNamespace(catalog, ns, _), ifExists, cascade) => DropNamespaceExec(catalog, ns, ifExists, cascade) :: Nil - case ShowNamespaces(ResolvedNamespace(catalog, ns), pattern, output) => + case ShowNamespaces(ResolvedNamespace(catalog, ns, _), pattern, output) => ShowNamespacesExec(output, catalog.asNamespaceCatalog, ns, pattern) :: Nil - case ShowTables(ResolvedNamespace(catalog, ns), pattern, output) => + case ShowTables(ResolvedNamespace(catalog, ns, _), pattern, output) => ShowTablesExec(output, catalog.asTableCatalog, ns, pattern) :: Nil case ShowTablesExtended( - ResolvedNamespace(catalog, ns), + ResolvedNamespace(catalog, ns, _), pattern, output) => ShowTablesExtendedExec(output, catalog.asTableCatalog, ns, pattern) :: Nil @@ -416,7 +416,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat ShowTablePartitionExec(output, r.catalog, r.identifier, r.table.asPartitionable, Seq(part).asResolvedPartitionSpecs.head) :: Nil - case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns)) => + case SetCatalogAndNamespace(ResolvedNamespace(catalog, ns, _)) => val catalogManager = session.sessionState.catalogManager val namespace = if (ns.nonEmpty) Some(ns) else None SetCatalogAndNamespaceExec(catalogManager, Some(catalog.name()), namespace) :: Nil @@ -534,7 +534,8 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat s"DropIndex is not supported in this table ${table.name}.") } - case ShowFunctions(ResolvedNamespace(catalog, ns), userScope, systemScope, pattern, output) => + case ShowFunctions( + ResolvedNamespace(catalog, ns, _), userScope, systemScope, pattern, output) => ShowFunctionsExec( output, catalog.asFunctionCatalog, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala index cd9f8f6be3e..c37f31b9fa6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder import org.apache.spark.sql.catalyst.expressions.{Expression, Literal} import org.apache.spark.sql.catalyst.plans.logical.{CreateTable, LocalRelation, LogicalPlan, OptionList, RecoverPartitions, ShowFunctions, ShowNamespaces, ShowTables, UnresolvedTableSpec, View} import org.apache.spark.sql.catalyst.types.DataTypeUtils -import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, SupportsNamespaces, TableCatalog} +import org.apache.spark.sql.connector.catalog.{CatalogManager, SupportsNamespaces, TableCatalog} import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.{CatalogHelper, MultipartIdentifierHelper, NamespaceHelper, TransformHelper} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.ShowTablesCommand @@ -81,17 +81,7 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { /** * Returns a list of databases available across all sessions. */ - override def listDatabases(): Dataset[Database] = { - val plan = ShowNamespaces(UnresolvedNamespace(Nil), None) - val qe = sparkSession.sessionState.executePlan(plan) - val catalog = qe.analyzed.collectFirst { - case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog - }.get - val databases = qe.toRdd.collect().map { row => - getNamespace(catalog, parseIdent(row.getString(0))) - } - CatalogImpl.makeDataset(databases.toImmutableArraySeq, sparkSession) - } + override def listDatabases(): Dataset[Database] = listDatabasesInternal(None) /** * Returns a list of databases (namespaces) which name match the specify pattern and @@ -99,14 +89,17 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * * @since 3.5.0 */ - override def listDatabases(pattern: String): Dataset[Database] = { - val plan = ShowNamespaces(UnresolvedNamespace(Nil), Some(pattern)) + override def listDatabases(pattern: String): Dataset[Database] = + listDatabasesInternal(Some(pattern)) + + private def listDatabasesInternal(patternOpt: Option[String]): Dataset[Database] = { + val plan = ShowNamespaces(UnresolvedNamespace(Nil), patternOpt) val qe = sparkSession.sessionState.executePlan(plan) val catalog = qe.analyzed.collectFirst { case ShowNamespaces(r: ResolvedNamespace, _, _) => r.catalog }.get val databases = qe.toRdd.collect().map { row => - getNamespace(catalog, parseIdent(row.getString(0))) + makeDatabase(Some(catalog.name()), row.getString(0)) } CatalogImpl.makeDataset(databases.toImmutableArraySeq, sparkSession) } @@ -415,35 +408,28 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { } } - private def getNamespace(catalog: CatalogPlugin, ns: Seq[String]): Database = catalog match { - case catalog: SupportsNamespaces => - val metadata = catalog.loadNamespaceMetadata(ns.toArray) - new Database( - name = ns.quoted, - catalog = catalog.name, - description = metadata.get(SupportsNamespaces.PROP_COMMENT), - locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION)) - // If the catalog doesn't support namespaces, we assume it's an implicit namespace, which always - // exists but has no metadata. - case catalog: CatalogPlugin => - new Database( - name = ns.quoted, - catalog = catalog.name, - description = null, - locationUri = null) - case _ => new Database(name = ns.quoted, description = null, locationUri = null) - } - /** * Gets the database with the specified name. This throws an `AnalysisException` when no * `Database` can be found. */ override def getDatabase(dbName: String): Database = { - val namespace = resolveNamespace(dbName) - val plan = UnresolvedNamespace(namespace) + makeDatabase(None, dbName) + } + + private def makeDatabase(catalogNameOpt: Option[String], dbName: String): Database = { + val idents = catalogNameOpt match { + case Some(catalogName) => catalogName +: parseIdent(dbName) + case None => resolveNamespace(dbName) + } + val plan = UnresolvedNamespace(idents, fetchMetadata = true) sparkSession.sessionState.executePlan(plan).analyzed match { - case ResolvedNamespace(catalog, namespace) => - getNamespace(catalog, namespace) + case ResolvedNamespace(catalog, namespace, metadata) => + new Database( + name = namespace.quoted, + catalog = catalog.name, + description = metadata.get(SupportsNamespaces.PROP_COMMENT).orNull, + locationUri = metadata.get(SupportsNamespaces.PROP_LOCATION).orNull + ) case _ => new Database(name = dbName, description = null, locationUri = null) } } @@ -516,18 +502,11 @@ class CatalogImpl(sparkSession: SparkSession) extends Catalog { * Checks if the database with the specified name exists. */ override def databaseExists(dbName: String): Boolean = { - // To maintain backwards compatibility, we first treat the input is a simple dbName and check - // if sessionCatalog contains it. If no, we try to parse it, resolve catalog and namespace, - // and check if namespace exists in the catalog. - if (!sessionCatalog.databaseExists(dbName)) { - val plan = UnresolvedNamespace(parseIdent(dbName)) - sparkSession.sessionState.executePlan(plan).analyzed match { - case ResolvedNamespace(catalog: SupportsNamespaces, ns) => - catalog.namespaceExists(ns.toArray) - case _ => true - } - } else { + try { + getDatabase(dbName) true + } catch { + case _: NoSuchNamespaceException => false } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org