This is an automated email from the ASF dual-hosted git repository.
cloud-fan pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new 44818a74dc17 [SPARK-56697][SQL][DML] Refactor Catalog Manager
44818a74dc17 is described below
commit 44818a74dc17b95e7f74413a21decfb6482203e7
Author: Andreas Chatzistergiou <[email protected]>
AuthorDate: Mon May 25 14:40:19 2026 +0800
[SPARK-56697][SQL][DML] Refactor Catalog Manager
### What changes were proposed in this pull request?
Currently, the `TransactionAwareCatalogManager` is a decorator of the
`CatalogManager`. This introduces some risk because newly introduced methods in
the `CatalogManager` might not be implemented in
`TransactionAwareCatalogManager`. To resolve this issue we create a new
`CatalogManager` trait that all `CatalogManager` implementations can implement.
Furthermore, the PR fixes an issue where previously every `withTransaction`
call constructed a TACM whose super-constructor re-ran
`v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)`, rebinding
the v1 SessionCatalog's session-function-kinds source to the transient TACM.
Because old TACM never overrode sessionFunctionKindsForUnqualifiedResolution,
subsequent unqualified function resolution that took the path-kinds route
observed TACM's own (always-empty) `_currentCatalog [...]
### Why are the changes needed?
Ensures all newly introduced methods in the CatalogManager are going to be
implemented in both the `DefaultCatalogManager` and the
`TransactionAwareCatalogManager`.
### Does this PR introduce _any_ user-facing change?
Yes. Fixes an issue described in the description above.
### How was this patch tested?
Refactor PR. Used existing tests.
### Was this patch authored or co-authored using generative AI tooling?
Claude Sonnet 4.6.
Closes #55646 from andreaschat-db/dsv2TransactionCatalogManagerRefactor.
Authored-by: Andreas Chatzistergiou <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit e0fff75f68aa2f9bf39f2910820f235ed2cab3cb)
Signed-off-by: Wenchen Fan <[email protected]>
---
.../spark/sql/catalyst/analysis/Analyzer.scala | 4 +-
.../sql/catalyst/catalog/SessionCatalog.scala | 5 +-
.../spark/sql/catalyst/util/GeneratedColumn.scala | 4 +-
.../catalyst/util/ResolveDefaultColumnsUtil.scala | 4 +-
.../sql/connector/catalog/CatalogManager.scala | 142 ++++++++++++++-------
.../catalog/TransactionAwareCatalogManager.scala | 82 +++++++++---
.../AnalyzerExtensionPropagationSuite.scala | 4 +-
.../catalyst/analysis/LookupFunctionsSuite.scala | 8 +-
.../OptimizerStructuralIntegrityCheckerSuite.scala | 4 +-
.../catalyst/optimizer/SimpleTestOptimizer.scala | 4 +-
.../connector/catalog/CatalogManagerSuite.scala | 8 +-
.../sql/connect/ProtoToParsedPlanTestSuite.scala | 4 +-
.../sql/internal/BaseSessionStateBuilder.scala | 4 +-
13 files changed, 190 insertions(+), 87 deletions(-)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d123d36c23b2..3f8ce7e8b70a 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -75,7 +75,7 @@ import org.apache.spark.util.ArrayImplicits._
* functions.
*/
object SimpleAnalyzer extends Analyzer(
- new CatalogManager(
+ new DefaultCatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
@@ -323,7 +323,7 @@ class Analyzer(
// Only for tests.
def this(catalog: SessionCatalog) = {
- this(new CatalogManager(FakeV2SessionCatalog, catalog))
+ this(new DefaultCatalogManager(FakeV2SessionCatalog, catalog))
}
def getRelationResolution: RelationResolution = relationResolution
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 9e5a2176612c..69af5ab80eec 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -121,7 +121,7 @@ class SessionCatalog(
/**
* Live PATH for session function kinds. Set from
- * [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor
via
+ * [[org.apache.spark.sql.connector.catalog.DefaultCatalogManager]]'s
constructor via
* [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and
the security check
* that blocks temp functions from shadowing builtins read the effective SQL
PATH (post-`SET
* PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]]
fallbacks already
@@ -135,7 +135,8 @@ class SessionCatalog(
/**
* Wire live PATH-derived session function kinds from the session
[[CatalogManager]].
- * Called once from
[[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor.
+ * Called once from
[[org.apache.spark.sql.connector.catalog.DefaultCatalogManager]]'s
+ * constructor.
*/
private[sql] def bindCatalogManagerForSessionFunctionKinds(cm:
CatalogManager): Unit = {
catalogManagerForSessionFunctionKinds = Some(cm)
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
index 8d88b05546ed..793c994fdd43 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier,
TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.{DefaultCatalogManager,
Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -205,5 +205,5 @@ object GeneratedColumn {
* Analyzer for processing generated column expressions using built-in
functions only.
*/
object GeneratedColumnAnalyzer extends Analyzer(
- new CatalogManager(BuiltInFunctionCatalog,
BuiltInFunctionCatalog.v1Catalog)) {
+ new DefaultCatalogManager(BuiltInFunctionCatalog,
BuiltInFunctionCatalog.v1Catalog)) {
}
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 9c077630f33d..491fb7f24a22 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -34,7 +34,7 @@ import
org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Column,
DefaultValue, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Column,
DefaultCatalogManager, DefaultValue, FunctionCatalog, Identifier, TableCatalog,
TableCatalogCapability}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
@@ -597,7 +597,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
* This is an Analyzer for processing default column values using built-in
functions only.
*/
object DefaultColumnAnalyzer extends Analyzer(
- new CatalogManager(BuiltInFunctionCatalog,
BuiltInFunctionCatalog.v1Catalog)) {
+ new DefaultCatalogManager(BuiltInFunctionCatalog,
BuiltInFunctionCatalog.v1Catalog)) {
}
/**
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 3aad52dbd1d0..4dd8af5eb37e 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -26,34 +26,100 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
/**
- * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered
catalogs, and allow
- * the caller to look up a catalog by name.
+ * A thread-safe contract for managing [[CatalogPlugin]]s. Implementations
resolve catalogs by
+ * name and maintain the current catalog and namespace for a session.
*
* There are still many commands (e.g. ANALYZE TABLE) that do not support v2
catalog API. They
* ignore the current catalog and blindly go to the v1 `SessionCatalog`. To
avoid tracking current
- * namespace in both `SessionCatalog` and `CatalogManger`, we let
`CatalogManager` to set/get
+ * namespace in both `SessionCatalog` and `CatalogManager`, implementations
set/get the
* current database of `SessionCatalog` when the current catalog is the
session catalog.
+ *
+ * Two implementations exist: [[DefaultCatalogManager]] owns the mutable
session state;
+ * [[TransactionAwareCatalogManager]] wraps another manager and redirects
catalog lookups to the
+ * active transaction's catalog.
*/
// TODO: all commands should look up table from the current catalog. The
`SessionCatalog` doesn't
// need to track current database at all.
-private[sql]
-class CatalogManager(
- val defaultSessionCatalog: CatalogPlugin,
- val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
+private[sql] trait CatalogManager extends SQLConfHelper with Logging {
+
+ // ---- Underlying state exposed by implementations ----
+ def defaultSessionCatalog: CatalogPlugin
+ def v1SessionCatalog: SessionCatalog
+ def tempVariableManager: TempVariableManager
+
+ // ---- Catalog access ----
+ def catalog(name: String): CatalogPlugin
+ private[sql] def v2SessionCatalog: CatalogPlugin
+ def listCatalogs(pattern: Option[String]): Seq[String]
+ def currentCatalog: CatalogPlugin
+ def setCurrentCatalog(catalogName: String): Unit
+ def isCatalogRegistered(name: String): Boolean = {
+ try {
+ catalog(name)
+ true
+ } catch {
+ case _: CatalogNotFoundException => false
+ }
+ }
+
+ // ---- Transactions ----
+ def transaction: Option[Transaction] = None
+
+ def withTransaction(transaction: Transaction): CatalogManager
+
+ // ---- Namespace ----
+ def currentNamespace: Array[String]
+ def setCurrentNamespace(namespace: Array[String]): Unit
+
+ // ---- Session path ----
+ def sessionPathEntries: Option[Seq[SessionPathEntry]]
+ def storedSessionPathEntries: Option[Seq[SessionPathEntry]]
+ def confDefaultPathEntries: Option[Seq[SessionPathEntry]]
+ def setSessionPath(entries: Seq[SessionPathEntry]): Unit
+ def clearSessionPath(): Unit
+ private[sql] def copySessionPathFrom(other: CatalogManager): Unit
+ def currentPathString: String
+ def sqlResolutionPathEntries(
+ pathDefaultCatalog: String,
+ pathDefaultNamespace: Seq[String],
+ expandCatalog: String,
+ expandNamespace: Seq[String]): Seq[Seq[String]]
+ def sqlResolutionPathEntries(
+ currentCatalog: String,
+ currentNamespace: Seq[String]): Seq[Seq[String]]
+ def isSystemSessionOnPath: Boolean
+ def resolutionPathEntriesForAnalysis(
+ pinnedEntries: Option[Seq[Seq[String]]],
+ viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]]
+ def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind]
+
+ // Reset the manager to its initial state. Only used in tests.
+ private[sql] def reset(): Unit
+}
+
+/**
+ * Default [[CatalogManager]] implementation. Owns the mutable session state
+ * (registered catalogs, current catalog/namespace, session path).
+ */
+private[sql] class DefaultCatalogManager(
+ override val defaultSessionCatalog: CatalogPlugin,
+ override val v1SessionCatalog: SessionCatalog) extends CatalogManager {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._
private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]
// TODO: create a real SYSTEM catalog to host `TempVariableManager` under
the SESSION namespace.
- val tempVariableManager: TempVariableManager = new TempVariableManager
+ override val tempVariableManager: TempVariableManager = new
TempVariableManager
// Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds
list itself is
// pure data conversion (system entries from the path, in path order); the
*decision* to use
@@ -61,7 +127,7 @@ class CatalogManager(
// [[CatalogManager.systemFunctionKindsFromPath]]).
v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)
- def catalog(name: String): CatalogPlugin = synchronized {
+ override def catalog(name: String): CatalogPlugin = synchronized {
if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
v2SessionCatalog
} else {
@@ -69,20 +135,6 @@ class CatalogManager(
}
}
- def transaction: Option[Transaction] = None
-
- def withTransaction(transaction: Transaction): CatalogManager =
- new TransactionAwareCatalogManager(this, transaction)
-
- def isCatalogRegistered(name: String): Boolean = {
- try {
- catalog(name)
- true
- } catch {
- case _: CatalogNotFoundException => false
- }
- }
-
private def loadV2SessionCatalog(): CatalogPlugin = {
Catalogs.load(SESSION_CATALOG_NAME, conf) match {
case extension: CatalogExtension =>
@@ -101,16 +153,19 @@ class CatalogManager(
* This happens when the source implementation extends the v2 TableProvider
API and is not listed
* in the fallback configuration, spark.sql.sources.useV1SourceList
*/
- private[sql] def v2SessionCatalog: CatalogPlugin = {
+ override private[sql] def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) match {
case "builtin" => defaultSessionCatalog
case _ => catalogs.getOrElseUpdate(SESSION_CATALOG_NAME,
loadV2SessionCatalog())
}
}
+ override def withTransaction(transaction: Transaction): CatalogManager =
+ new TransactionAwareCatalogManager(this, transaction)
+
private var _currentNamespace: Option[Array[String]] = None
- def currentNamespace: Array[String] = {
+ override def currentNamespace: Array[String] = {
val defaultNamespace = if (currentCatalog.name() == SESSION_CATALOG_NAME) {
Array(v1SessionCatalog.getCurrentDatabase)
} else {
@@ -132,7 +187,7 @@ class CatalogManager(
}
}
- def setCurrentNamespace(namespace: Array[String]): Unit = {
+ override def setCurrentNamespace(namespace: Array[String]): Unit = {
// SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the
callbacks below.
// [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly
synchronizes on
// [[SessionCatalog]], and concurrent unqualified function resolution
acquires the
@@ -175,8 +230,6 @@ class CatalogManager(
}
}
- import CatalogManager.SessionPathEntry
-
private var _sessionPath: Option[Seq[SessionPathEntry]] = None
/**
@@ -196,13 +249,14 @@ class CatalogManager(
* [[currentCatalog]] falls back to [[SQLConf#DEFAULT_CATALOG]]). Returns
`None` when
* [[SQLConf#PATH_ENABLED]] is false or both sources are empty.
*/
- def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
+ override def sessionPathEntries: Option[Seq[SessionPathEntry]] =
synchronized {
if (!conf.pathEnabled) None
else _sessionPath.orElse(confDefaultPathEntries)
}
/** Raw `_sessionPath` (post-`SET PATH`), without the
[[SQLConf#DEFAULT_PATH]] fallback. */
- def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
_sessionPath }
+ override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
+ synchronized { _sessionPath }
/**
* Parsed and expanded [[SQLConf#DEFAULT_PATH]] value, or `None` when the
conf is empty.
@@ -216,7 +270,7 @@ class CatalogManager(
* `USE SCHEMA`) is dead code rather than an error. Cached so the hot path
is a single
* atomic load on conf-stable sessions.
*/
- def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
+ override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
val confValue = conf.defaultPath
if (confValue == null || confValue.trim.isEmpty) {
confDefaultPathCache.set(None)
@@ -236,15 +290,15 @@ class CatalogManager(
}
}
- def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
+ override def setSessionPath(entries: Seq[SessionPathEntry]): Unit =
synchronized {
_sessionPath = Some(entries)
}
- def clearSessionPath(): Unit = synchronized {
+ override def clearSessionPath(): Unit = synchronized {
_sessionPath = None
}
- private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
synchronized {
+ override private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
synchronized {
_sessionPath = other.storedSessionPathEntries
}
@@ -263,7 +317,7 @@ class CatalogManager(
* new SC->CM ordering must take `currentPathString` (or any other CM->SC
nest) into
* account to avoid resurrecting the deadlock.
*/
- def currentPathString: String = synchronized {
+ override def currentPathString: String = synchronized {
import CatalogV2Implicits._
sessionPathEntries match {
case Some(entries) =>
@@ -282,7 +336,7 @@ class CatalogManager(
* When PATH is in effect (stored or via the [[SQLConf#DEFAULT_PATH]] conf),
uses the
* resolved entries.
*/
- def sqlResolutionPathEntries(
+ override def sqlResolutionPathEntries(
pathDefaultCatalog: String,
pathDefaultNamespace: Seq[String],
expandCatalog: String,
@@ -299,7 +353,7 @@ class CatalogManager(
}
/** Session-catalog overload. */
- def sqlResolutionPathEntries(
+ override def sqlResolutionPathEntries(
currentCatalog: String,
currentNamespace: Seq[String]): Seq[Seq[String]] =
sqlResolutionPathEntries(
@@ -330,7 +384,7 @@ class CatalogManager(
*
[[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]])
* MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this
method.
*/
- def sessionFunctionKindsForUnqualifiedResolution():
Seq[SessionCatalog.SessionFunctionKind] = {
+ override def sessionFunctionKindsForUnqualifiedResolution():
Seq[SessionFunctionKind] = {
// SPARK-56939: read v1's current database before taking the CM lock; see
the method
// doc for why the resulting staleness is harmless for the kinds list.
val v1CurrentDb = v1SessionCatalog.getCurrentDatabase
@@ -358,7 +412,7 @@ class CatalogManager(
* invariant ever changes, this short-circuit must be revisited.
* Inspecting effective entries directly avoids loading the configured
default catalog.
*/
- def isSystemSessionOnPath: Boolean = synchronized {
+ override def isSystemSessionOnPath: Boolean = synchronized {
if (!conf.pathEnabled) return true
sessionPathEntries match {
case None => true
@@ -385,7 +439,7 @@ class CatalogManager(
* (typically
`AnalysisContext.catalogAndNamespace`); empty when
* not resolving a view body.
*/
- def resolutionPathEntriesForAnalysis(
+ override def resolutionPathEntriesForAnalysis(
pinnedEntries: Option[Seq[Seq[String]]],
viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] = {
pinnedEntries match {
@@ -409,11 +463,11 @@ class CatalogManager(
private var _currentCatalogName: Option[String] = None
- def currentCatalog: CatalogPlugin = synchronized {
+ override def currentCatalog: CatalogPlugin = synchronized {
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
}
- def setCurrentCatalog(catalogName: String): Unit = {
+ override def setCurrentCatalog(catalogName: String): Unit = {
// SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting
[[CatalogManager]]'s lock
// across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
// [[SessionCatalog]]) to prevent a lock-order inversion with concurrent
unqualified
@@ -451,7 +505,7 @@ class CatalogManager(
}
}
- def listCatalogs(pattern: Option[String]): Seq[String] = {
+ override def listCatalogs(pattern: Option[String]): Seq[String] = {
val allCatalogs = (synchronized(catalogs.keys.toSeq) :+
SESSION_CATALOG_NAME).distinct.sorted
pattern.map(StringUtils.filterPattern(allCatalogs,
_)).getOrElse(allCatalogs)
}
@@ -463,7 +517,7 @@ class CatalogManager(
// calls back into [[v1SessionCatalog]]. Test-only callers don't race
against unqualified
// function resolution today, but keeping the contract symmetric prevents
future test
// helpers (e.g. session reset in a concurrent harness) from reintroducing
the cycle.
- private[sql] def reset(): Unit = {
+ override private[sql] def reset(): Unit = {
synchronized {
catalogs.clear()
_currentNamespace = None
diff --git
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
index 70079357b6dd..e7a0cc73a350 100644
---
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
+++
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
@@ -18,47 +18,95 @@
package org.apache.spark.sql.connector.catalog
import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.catalog.TempVariableManager
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
/**
* A [[CatalogManager]] decorator that redirects catalog lookups to the
transaction's catalog
* instance when names match, ensuring table loads during analysis are scoped
to the transaction.
- * All mutable state (current catalog, current namespace, loaded catalogs) is
delegated to the
- * wrapped [[CatalogManager]].
+ * All mutable session state is delegated to the wrapped [[CatalogManager]].
*/
-// TODO: Extracting a CatalogManager trait (so this class can implement it
instead of extending
-// CatalogManager) would eliminate the inherited mutable state that this
decorator doesn't use.
private[sql] class TransactionAwareCatalogManager(
delegate: CatalogManager,
- txn: Transaction)
- extends CatalogManager(delegate.defaultSessionCatalog,
delegate.v1SessionCatalog) {
+ txn: Transaction) extends CatalogManager {
- override val tempVariableManager: TempVariableManager =
delegate.tempVariableManager
-
- override def transaction: Option[Transaction] = Some(txn)
-
- override def withTransaction(newTxn: Transaction): CatalogManager =
- throw SparkException.internalError("Cannot nest transactions: a
transaction is already active.")
+ // ---- Underlying state: pure delegation. ----
+ override def defaultSessionCatalog: CatalogPlugin =
delegate.defaultSessionCatalog
+ override def v1SessionCatalog: SessionCatalog = delegate.v1SessionCatalog
+ override def tempVariableManager: TempVariableManager =
delegate.tempVariableManager
+ // ---- Catalog access: redirect to txn catalog when names match. ----
override def catalog(name: String): CatalogPlugin = {
val resolved = delegate.catalog(name)
if (txn.catalog.name() == resolved.name()) txn.catalog else resolved
}
+ override private[sql] def v2SessionCatalog: CatalogPlugin =
delegate.v2SessionCatalog
+
+ override def listCatalogs(pattern: Option[String]): Seq[String] =
+ delegate.listCatalogs(pattern)
+
+ override def transaction: Option[Transaction] = Some(txn)
+
+ override def withTransaction(newTxn: Transaction): CatalogManager =
+ throw SparkException.internalError("Cannot nest transactions: a
transaction is already active.")
+
override def currentCatalog: CatalogPlugin = {
val c = delegate.currentCatalog
if (txn.catalog.name() == c.name()) txn.catalog else c
}
+ override def setCurrentCatalog(catalogName: String): Unit =
+ delegate.setCurrentCatalog(catalogName)
+
override def currentNamespace: Array[String] = delegate.currentNamespace
override def setCurrentNamespace(namespace: Array[String]): Unit =
delegate.setCurrentNamespace(namespace)
- override def setCurrentCatalog(catalogName: String): Unit =
- delegate.setCurrentCatalog(catalogName)
+ override def sessionPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.sessionPathEntries
- override def listCatalogs(pattern: Option[String]): Seq[String] =
- delegate.listCatalogs(pattern)
+ override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.storedSessionPathEntries
+
+ override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.confDefaultPathEntries
+
+ override def setSessionPath(entries: Seq[SessionPathEntry]): Unit =
+ delegate.setSessionPath(entries)
+
+ override def clearSessionPath(): Unit = delegate.clearSessionPath()
+
+ override private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
+ delegate.copySessionPathFrom(other)
+
+ override def currentPathString: String = delegate.currentPathString
+
+ override def sqlResolutionPathEntries(
+ pathDefaultCatalog: String,
+ pathDefaultNamespace: Seq[String],
+ expandCatalog: String,
+ expandNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.sqlResolutionPathEntries(
+ pathDefaultCatalog, pathDefaultNamespace, expandCatalog, expandNamespace)
+
+ override def sqlResolutionPathEntries(
+ currentCatalog: String,
+ currentNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.sqlResolutionPathEntries(currentCatalog, currentNamespace)
+
+ override def isSystemSessionOnPath: Boolean = delegate.isSystemSessionOnPath
+
+ override def resolutionPathEntriesForAnalysis(
+ pinnedEntries: Option[Seq[Seq[String]]],
+ viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.resolutionPathEntriesForAnalysis(pinnedEntries,
viewCatalogAndNamespace)
+
+ override def sessionFunctionKindsForUnqualifiedResolution():
Seq[SessionFunctionKind] =
+ delegate.sessionFunctionKindsForUnqualifiedResolution()
+
+ override private[sql] def reset(): Unit = delegate.reset()
}
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
index 65ab822ec841..d3dab281a30c 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
@@ -22,7 +22,7 @@ import
org.apache.spark.sql.catalyst.analysis.resolver.{ResolverExtension, TreeN
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.{CatalogManager,
DefaultCatalogManager}
/**
* Verifies that [[Analyzer.withCatalogManager]] propagates all extension
points.
@@ -49,7 +49,7 @@ class AnalyzerExtensionPropagationSuite extends SparkFunSuite
{
}
private def newCatalogManager(): CatalogManager =
- new CatalogManager(
+ new DefaultCatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
index 183d37f972a2..93906c756570 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
@@ -25,7 +25,7 @@ import
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog,
import org.apache.spark.sql.catalyst.expressions.{Alias, ExpressionInfo}
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.{CatalogManager,
FunctionCatalog, Identifier}
+import org.apache.spark.sql.connector.catalog.{CatalogManager,
DefaultCatalogManager, FunctionCatalog, Identifier}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -45,7 +45,7 @@ class LookupFunctionsSuite extends PlanTest {
CatalogDatabase("db1", "", new URI("loc2"), Map.empty),
ignoreIfExists = false)
val catalog = new SessionCatalog(externalCatalog, new
SimpleFunctionRegistry)
- val catalogManager = new CatalogManager(new
CustomV2SessionCatalog(catalog), catalog)
+ val catalogManager = new DefaultCatalogManager(new
CustomV2SessionCatalog(catalog), catalog)
catalogManager.setCurrentNamespace(Array("db1"))
try {
val analyzer = new Analyzer(catalogManager)
@@ -75,7 +75,7 @@ class LookupFunctionsSuite extends PlanTest {
test("SPARK-23486: the getFunction for the Persistent function check") {
val externalCatalog = new CustomInMemoryCatalog
val catalog = new SessionCatalog(externalCatalog,
FunctionRegistry.builtin.clone())
- val catalogManager = new CatalogManager(new
CustomV2SessionCatalog(catalog), catalog)
+ val catalogManager = new DefaultCatalogManager(new
CustomV2SessionCatalog(catalog), catalog)
val analyzer = {
catalog.createDatabase(
CatalogDatabase("default", "", new URI("loc"), Map.empty),
@@ -100,7 +100,7 @@ class LookupFunctionsSuite extends PlanTest {
val externalCatalog = new InMemoryCatalog
val customerFunctionReg = new CustomerFunctionRegistry
val catalog = new SessionCatalog(externalCatalog, customerFunctionReg)
- val catalogManager = new CatalogManager(new
CustomV2SessionCatalog(catalog), catalog)
+ val catalogManager = new DefaultCatalogManager(new
CustomV2SessionCatalog(catalog), catalog)
val analyzer = {
catalog.createDatabase(
CatalogDatabase("default", "", new URI("loc"), Map.empty),
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
index 09eb1f586421..56ee9a36c778 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter,
LocalRelation, LogicalPlan, OneRowRelation, Project}
import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.DefaultCatalogManager
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{BooleanType, StringType, StructType}
@@ -57,7 +57,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends
PlanTest {
}
object Optimize extends Optimizer(
- new CatalogManager(
+ new DefaultCatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))) {
val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI)
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
index 007a2b3fd058..b36cfb930c56 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry,
EmptyTableFunctionRegistry, FakeV2SessionCatalog}
import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.DefaultCatalogManager
/**
* An optimizer used in test code.
@@ -29,6 +29,6 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
object SimpleTestOptimizer extends SimpleTestOptimizer
class SimpleTestOptimizer extends Optimizer(
- new CatalogManager(
+ new DefaultCatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry,
EmptyTableFunctionRegistry)))
diff --git
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
index 64b2ac91fbd6..199e43d39bbe 100644
---
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
+++
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
@@ -41,7 +41,7 @@ class CatalogManagerSuite extends SparkFunSuite with
SQLHelper {
}
test("CatalogManager should reflect the changes of default catalog") {
- val catalogManager = new CatalogManager(FakeV2SessionCatalog,
createSessionCatalog())
+ val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog,
createSessionCatalog())
assert(catalogManager.currentCatalog.name() ==
CatalogManager.SESSION_CATALOG_NAME)
assert(catalogManager.currentNamespace.sameElements(Array("default")))
@@ -54,7 +54,7 @@ class CatalogManagerSuite extends SparkFunSuite with
SQLHelper {
}
test("CatalogManager should keep the current catalog once set") {
- val catalogManager = new CatalogManager(FakeV2SessionCatalog,
createSessionCatalog())
+ val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog,
createSessionCatalog())
assert(catalogManager.currentCatalog.name() ==
CatalogManager.SESSION_CATALOG_NAME)
withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) {
catalogManager.setCurrentCatalog("dummy")
@@ -70,7 +70,7 @@ class CatalogManagerSuite extends SparkFunSuite with
SQLHelper {
}
test("current namespace should be updated when switching current catalog") {
- val catalogManager = new CatalogManager(FakeV2SessionCatalog,
createSessionCatalog())
+ val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog,
createSessionCatalog())
withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) {
catalogManager.setCurrentCatalog("dummy")
assert(catalogManager.currentNamespace.sameElements(Array("a", "b")))
@@ -95,7 +95,7 @@ class CatalogManagerSuite extends SparkFunSuite with
SQLHelper {
CatalogDatabase(
"test", "", v1SessionCatalog.getDefaultDBPath("test"), Map.empty),
ignoreIfExists = false)
- val catalogManager = new CatalogManager(FakeV2SessionCatalog,
v1SessionCatalog)
+ val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog,
v1SessionCatalog)
// If the current catalog is session catalog, setting current namespace
actually sets
// `SessionCatalog.currentDb`.
diff --git
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 913d3d1c0ff5..bf8469f19ee7 100644
---
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.connect.config.Connect
import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Column,
Identifier, InMemoryChangelogCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Column,
DefaultCatalogManager, Identifier, InMemoryChangelogCatalog}
import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
@@ -163,7 +163,7 @@ class ProtoToParsedPlanTestSuite
Array.empty[Transform],
emptyProps)
- val catalogManager = new CatalogManager(
+ val catalogManager = new DefaultCatalogManager(
inMemoryCatalog,
new SessionCatalog(
new catalog.InMemoryCatalog(),
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index cc8c9dcb71f5..abfdf3e76787 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.classic.{SparkSession, Strategy,
StreamingCheckpointManager, StreamingQueryManager, UDFRegistration}
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.DefaultCatalogManager
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode,
QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder
@@ -161,7 +161,7 @@ abstract class BaseSessionStateBuilder(
protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog)
protected lazy val catalogManager = {
- val cm = new CatalogManager(v2SessionCatalog, catalog)
+ val cm = new DefaultCatalogManager(v2SessionCatalog, catalog)
parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager))
cm
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]