This is an automated email from the ASF dual-hosted git repository.

cloud-fan pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new a79ea5dd1f76 [SPARK-56697][SQL][DML] Refactor Catalog Manager
a79ea5dd1f76 is described below

commit a79ea5dd1f7667b78b5074cc1b0da8acda62be8c
Author: Andreas Chatzistergiou <[email protected]>
AuthorDate: Mon May 25 14:40:19 2026 +0800

    [SPARK-56697][SQL][DML] Refactor Catalog Manager
    
    ### What changes were proposed in this pull request?
    
    Currently, the `TransactionAwareCatalogManager` is a decorator of the 
`CatalogManager`. This introduces some risk because newly introduced methods in 
the `CatalogManager` might not be implemented in 
`TransactionAwareCatalogManager`. To resolve this issue we create a new 
`CatalogManager` trait that all `CatalogManager` implementations can implement.
    
    Furthermore, the PR fixes an issue where previously every `withTransaction` 
call constructed a TACM whose super-constructor re-ran 
`v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)`, rebinding 
the v1 SessionCatalog's session-function-kinds source to the transient TACM. 
Because old TACM never overrode sessionFunctionKindsForUnqualifiedResolution, 
subsequent unqualified function resolution that took the path-kinds route 
observed TACM's own (always-empty) `_currentCatalog [...]
    
    ### Why are the changes needed?
    
    Ensures all newly introduced methods in the CatalogManager are going to be 
implemented in both the `DefaultCatalogManager` and the 
`TransactionAwareCatalogManager`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes. Fixes an issue described in the description above.
    
    ### How was this patch tested?
    
    Refactor PR. Used existing tests.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Claude Sonnet 4.6.
    
    Closes #55646 from andreaschat-db/dsv2TransactionCatalogManagerRefactor.
    
    Authored-by: Andreas Chatzistergiou <[email protected]>
    Signed-off-by: Wenchen Fan <[email protected]>
    (cherry picked from commit e0fff75f68aa2f9bf39f2910820f235ed2cab3cb)
    Signed-off-by: Wenchen Fan <[email protected]>
---
 .../spark/sql/catalyst/analysis/Analyzer.scala     |   4 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      |   5 +-
 .../spark/sql/catalyst/util/GeneratedColumn.scala  |   4 +-
 .../catalyst/util/ResolveDefaultColumnsUtil.scala  |   4 +-
 .../sql/connector/catalog/CatalogManager.scala     | 142 ++++++++++++++-------
 .../catalog/TransactionAwareCatalogManager.scala   |  82 +++++++++---
 .../AnalyzerExtensionPropagationSuite.scala        |   4 +-
 .../catalyst/analysis/LookupFunctionsSuite.scala   |   8 +-
 .../OptimizerStructuralIntegrityCheckerSuite.scala |   4 +-
 .../catalyst/optimizer/SimpleTestOptimizer.scala   |   4 +-
 .../connector/catalog/CatalogManagerSuite.scala    |   8 +-
 .../sql/connect/ProtoToParsedPlanTestSuite.scala   |   4 +-
 .../sql/internal/BaseSessionStateBuilder.scala     |   4 +-
 13 files changed, 190 insertions(+), 87 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index d123d36c23b2..3f8ce7e8b70a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -75,7 +75,7 @@ import org.apache.spark.util.ArrayImplicits._
  * functions.
  */
 object SimpleAnalyzer extends Analyzer(
-  new CatalogManager(
+  new DefaultCatalogManager(
     FakeV2SessionCatalog,
     new SessionCatalog(
       new InMemoryCatalog,
@@ -323,7 +323,7 @@ class Analyzer(
 
   // Only for tests.
   def this(catalog: SessionCatalog) = {
-    this(new CatalogManager(FakeV2SessionCatalog, catalog))
+    this(new DefaultCatalogManager(FakeV2SessionCatalog, catalog))
   }
 
   def getRelationResolution: RelationResolution = relationResolution
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 9e5a2176612c..69af5ab80eec 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -121,7 +121,7 @@ class SessionCatalog(
 
   /**
    * Live PATH for session function kinds. Set from
-   * [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor 
via
+   * [[org.apache.spark.sql.connector.catalog.DefaultCatalogManager]]'s 
constructor via
    * [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and 
the security check
    * that blocks temp functions from shadowing builtins read the effective SQL 
PATH (post-`SET
    * PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]] 
fallbacks already
@@ -135,7 +135,8 @@ class SessionCatalog(
 
   /**
    * Wire live PATH-derived session function kinds from the session 
[[CatalogManager]].
-   * Called once from 
[[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor.
+   * Called once from 
[[org.apache.spark.sql.connector.catalog.DefaultCatalogManager]]'s
+   * constructor.
    */
   private[sql] def bindCatalogManagerForSessionFunctionKinds(cm: 
CatalogManager): Unit = {
     catalogManagerForSessionFunctionKinds = Some(cm)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
index 8d88b05546ed..793c994fdd43 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/GeneratedColumn.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
 import 
org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, 
TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.{DefaultCatalogManager, 
Identifier, TableCatalog, TableCatalogCapability}
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -205,5 +205,5 @@ object GeneratedColumn {
  * Analyzer for processing generated column expressions using built-in 
functions only.
  */
 object GeneratedColumnAnalyzer extends Analyzer(
-  new CatalogManager(BuiltInFunctionCatalog, 
BuiltInFunctionCatalog.v1Catalog)) {
+  new DefaultCatalogManager(BuiltInFunctionCatalog, 
BuiltInFunctionCatalog.v1Catalog)) {
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
index 9c077630f33d..491fb7f24a22 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ResolveDefaultColumnsUtil.scala
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
 import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, 
DefaultValue, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, 
DefaultCatalogManager, DefaultValue, FunctionCatalog, Identifier, TableCatalog, 
TableCatalogCapability}
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
 import org.apache.spark.sql.internal.SQLConf
@@ -597,7 +597,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
    * This is an Analyzer for processing default column values using built-in 
functions only.
    */
   object DefaultColumnAnalyzer extends Analyzer(
-    new CatalogManager(BuiltInFunctionCatalog, 
BuiltInFunctionCatalog.v1Catalog)) {
+    new DefaultCatalogManager(BuiltInFunctionCatalog, 
BuiltInFunctionCatalog.v1Catalog)) {
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
index 3aad52dbd1d0..4dd8af5eb37e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala
@@ -26,34 +26,100 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
 import org.apache.spark.sql.connector.catalog.transactions.Transaction
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 
 /**
- * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered 
catalogs, and allow
- * the caller to look up a catalog by name.
+ * A thread-safe contract for managing [[CatalogPlugin]]s. Implementations 
resolve catalogs by
+ * name and maintain the current catalog and namespace for a session.
  *
  * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 
catalog API. They
  * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To 
avoid tracking current
- * namespace in both `SessionCatalog` and `CatalogManger`, we let 
`CatalogManager` to set/get
+ * namespace in both `SessionCatalog` and `CatalogManager`, implementations 
set/get the
  * current database of `SessionCatalog` when the current catalog is the 
session catalog.
+ *
+ * Two implementations exist: [[DefaultCatalogManager]] owns the mutable 
session state;
+ * [[TransactionAwareCatalogManager]] wraps another manager and redirects 
catalog lookups to the
+ * active transaction's catalog.
  */
 // TODO: all commands should look up table from the current catalog. The 
`SessionCatalog` doesn't
 //       need to track current database at all.
-private[sql]
-class CatalogManager(
-    val defaultSessionCatalog: CatalogPlugin,
-    val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
+private[sql] trait CatalogManager extends SQLConfHelper with Logging {
+
+  // ---- Underlying state exposed by implementations ----
+  def defaultSessionCatalog: CatalogPlugin
+  def v1SessionCatalog: SessionCatalog
+  def tempVariableManager: TempVariableManager
+
+  // ---- Catalog access ----
+  def catalog(name: String): CatalogPlugin
+  private[sql] def v2SessionCatalog: CatalogPlugin
+  def listCatalogs(pattern: Option[String]): Seq[String]
+  def currentCatalog: CatalogPlugin
+  def setCurrentCatalog(catalogName: String): Unit
+  def isCatalogRegistered(name: String): Boolean = {
+    try {
+      catalog(name)
+      true
+    } catch {
+      case _: CatalogNotFoundException => false
+    }
+  }
+
+  // ---- Transactions ----
+  def transaction: Option[Transaction] = None
+
+  def withTransaction(transaction: Transaction): CatalogManager
+
+  // ---- Namespace ----
+  def currentNamespace: Array[String]
+  def setCurrentNamespace(namespace: Array[String]): Unit
+
+  // ---- Session path ----
+  def sessionPathEntries: Option[Seq[SessionPathEntry]]
+  def storedSessionPathEntries: Option[Seq[SessionPathEntry]]
+  def confDefaultPathEntries: Option[Seq[SessionPathEntry]]
+  def setSessionPath(entries: Seq[SessionPathEntry]): Unit
+  def clearSessionPath(): Unit
+  private[sql] def copySessionPathFrom(other: CatalogManager): Unit
+  def currentPathString: String
+  def sqlResolutionPathEntries(
+      pathDefaultCatalog: String,
+      pathDefaultNamespace: Seq[String],
+      expandCatalog: String,
+      expandNamespace: Seq[String]): Seq[Seq[String]]
+  def sqlResolutionPathEntries(
+      currentCatalog: String,
+      currentNamespace: Seq[String]): Seq[Seq[String]]
+  def isSystemSessionOnPath: Boolean
+  def resolutionPathEntriesForAnalysis(
+      pinnedEntries: Option[Seq[Seq[String]]],
+      viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]]
+  def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind]
+
+  // Reset the manager to its initial state. Only used in tests.
+  private[sql] def reset(): Unit
+}
+
+/**
+ * Default [[CatalogManager]] implementation. Owns the mutable session state
+ * (registered catalogs, current catalog/namespace, session path).
+ */
+private[sql] class DefaultCatalogManager(
+    override val defaultSessionCatalog: CatalogPlugin,
+    override val v1SessionCatalog: SessionCatalog) extends CatalogManager {
   import CatalogManager.SESSION_CATALOG_NAME
   import CatalogV2Util._
 
   private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]
 
   // TODO: create a real SYSTEM catalog to host `TempVariableManager` under 
the SESSION namespace.
-  val tempVariableManager: TempVariableManager = new TempVariableManager
+  override val tempVariableManager: TempVariableManager = new 
TempVariableManager
 
   // Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds 
list itself is
   // pure data conversion (system entries from the path, in path order); the 
*decision* to use
@@ -61,7 +127,7 @@ class CatalogManager(
   // [[CatalogManager.systemFunctionKindsFromPath]]).
   v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)
 
-  def catalog(name: String): CatalogPlugin = synchronized {
+  override def catalog(name: String): CatalogPlugin = synchronized {
     if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
       v2SessionCatalog
     } else {
@@ -69,20 +135,6 @@ class CatalogManager(
     }
   }
 
-  def transaction: Option[Transaction] = None
-
-  def withTransaction(transaction: Transaction): CatalogManager =
-    new TransactionAwareCatalogManager(this, transaction)
-
-  def isCatalogRegistered(name: String): Boolean = {
-    try {
-      catalog(name)
-      true
-    } catch {
-      case _: CatalogNotFoundException => false
-    }
-  }
-
   private def loadV2SessionCatalog(): CatalogPlugin = {
     Catalogs.load(SESSION_CATALOG_NAME, conf) match {
       case extension: CatalogExtension =>
@@ -101,16 +153,19 @@ class CatalogManager(
    * This happens when the source implementation extends the v2 TableProvider 
API and is not listed
    * in the fallback configuration, spark.sql.sources.useV1SourceList
    */
-  private[sql] def v2SessionCatalog: CatalogPlugin = {
+  override private[sql] def v2SessionCatalog: CatalogPlugin = {
     conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) match {
       case "builtin" => defaultSessionCatalog
       case _ => catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, 
loadV2SessionCatalog())
     }
   }
 
+  override def withTransaction(transaction: Transaction): CatalogManager =
+    new TransactionAwareCatalogManager(this, transaction)
+
   private var _currentNamespace: Option[Array[String]] = None
 
-  def currentNamespace: Array[String] = {
+  override def currentNamespace: Array[String] = {
     val defaultNamespace = if (currentCatalog.name() == SESSION_CATALOG_NAME) {
       Array(v1SessionCatalog.getCurrentDatabase)
     } else {
@@ -132,7 +187,7 @@ class CatalogManager(
     }
   }
 
-  def setCurrentNamespace(namespace: Array[String]): Unit = {
+  override def setCurrentNamespace(namespace: Array[String]): Unit = {
     // SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the 
callbacks below.
     // [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly 
synchronizes on
     // [[SessionCatalog]], and concurrent unqualified function resolution 
acquires the
@@ -175,8 +230,6 @@ class CatalogManager(
     }
   }
 
-  import CatalogManager.SessionPathEntry
-
   private var _sessionPath: Option[Seq[SessionPathEntry]] = None
 
   /**
@@ -196,13 +249,14 @@ class CatalogManager(
    * [[currentCatalog]] falls back to [[SQLConf#DEFAULT_CATALOG]]). Returns 
`None` when
    * [[SQLConf#PATH_ENABLED]] is false or both sources are empty.
    */
-  def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
+  override def sessionPathEntries: Option[Seq[SessionPathEntry]] = 
synchronized {
     if (!conf.pathEnabled) None
     else _sessionPath.orElse(confDefaultPathEntries)
   }
 
   /** Raw `_sessionPath` (post-`SET PATH`), without the 
[[SQLConf#DEFAULT_PATH]] fallback. */
-  def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { 
_sessionPath }
+  override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
+    synchronized { _sessionPath }
 
   /**
    * Parsed and expanded [[SQLConf#DEFAULT_PATH]] value, or `None` when the 
conf is empty.
@@ -216,7 +270,7 @@ class CatalogManager(
    * `USE SCHEMA`) is dead code rather than an error. Cached so the hot path 
is a single
    * atomic load on conf-stable sessions.
    */
-  def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
+  override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
     val confValue = conf.defaultPath
     if (confValue == null || confValue.trim.isEmpty) {
       confDefaultPathCache.set(None)
@@ -236,15 +290,15 @@ class CatalogManager(
     }
   }
 
-  def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
+  override def setSessionPath(entries: Seq[SessionPathEntry]): Unit = 
synchronized {
     _sessionPath = Some(entries)
   }
 
-  def clearSessionPath(): Unit = synchronized {
+  override def clearSessionPath(): Unit = synchronized {
     _sessionPath = None
   }
 
-  private[sql] def copySessionPathFrom(other: CatalogManager): Unit = 
synchronized {
+  override private[sql] def copySessionPathFrom(other: CatalogManager): Unit = 
synchronized {
     _sessionPath = other.storedSessionPathEntries
   }
 
@@ -263,7 +317,7 @@ class CatalogManager(
    * new SC->CM ordering must take `currentPathString` (or any other CM->SC 
nest) into
    * account to avoid resurrecting the deadlock.
    */
-  def currentPathString: String = synchronized {
+  override def currentPathString: String = synchronized {
     import CatalogV2Implicits._
     sessionPathEntries match {
       case Some(entries) =>
@@ -282,7 +336,7 @@ class CatalogManager(
    * When PATH is in effect (stored or via the [[SQLConf#DEFAULT_PATH]] conf), 
uses the
    * resolved entries.
    */
-  def sqlResolutionPathEntries(
+  override def sqlResolutionPathEntries(
       pathDefaultCatalog: String,
       pathDefaultNamespace: Seq[String],
       expandCatalog: String,
@@ -299,7 +353,7 @@ class CatalogManager(
   }
 
   /** Session-catalog overload. */
-  def sqlResolutionPathEntries(
+  override def sqlResolutionPathEntries(
       currentCatalog: String,
       currentNamespace: Seq[String]): Seq[Seq[String]] =
     sqlResolutionPathEntries(
@@ -330,7 +384,7 @@ class CatalogManager(
    * 
[[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]])
    * MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this 
method.
    */
-  def sessionFunctionKindsForUnqualifiedResolution(): 
Seq[SessionCatalog.SessionFunctionKind] = {
+  override def sessionFunctionKindsForUnqualifiedResolution(): 
Seq[SessionFunctionKind] = {
     // SPARK-56939: read v1's current database before taking the CM lock; see 
the method
     // doc for why the resulting staleness is harmless for the kinds list.
     val v1CurrentDb = v1SessionCatalog.getCurrentDatabase
@@ -358,7 +412,7 @@ class CatalogManager(
    * invariant ever changes, this short-circuit must be revisited.
    * Inspecting effective entries directly avoids loading the configured 
default catalog.
    */
-  def isSystemSessionOnPath: Boolean = synchronized {
+  override def isSystemSessionOnPath: Boolean = synchronized {
     if (!conf.pathEnabled) return true
     sessionPathEntries match {
       case None => true
@@ -385,7 +439,7 @@ class CatalogManager(
    *                               (typically 
`AnalysisContext.catalogAndNamespace`); empty when
    *                               not resolving a view body.
    */
-  def resolutionPathEntriesForAnalysis(
+  override def resolutionPathEntriesForAnalysis(
       pinnedEntries: Option[Seq[Seq[String]]],
       viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] = {
     pinnedEntries match {
@@ -409,11 +463,11 @@ class CatalogManager(
 
   private var _currentCatalogName: Option[String] = None
 
-  def currentCatalog: CatalogPlugin = synchronized {
+  override def currentCatalog: CatalogPlugin = synchronized {
     
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
   }
 
-  def setCurrentCatalog(catalogName: String): Unit = {
+  override def setCurrentCatalog(catalogName: String): Unit = {
     // SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting 
[[CatalogManager]]'s lock
     // across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
     // [[SessionCatalog]]) to prevent a lock-order inversion with concurrent 
unqualified
@@ -451,7 +505,7 @@ class CatalogManager(
     }
   }
 
-  def listCatalogs(pattern: Option[String]): Seq[String] = {
+  override def listCatalogs(pattern: Option[String]): Seq[String] = {
     val allCatalogs = (synchronized(catalogs.keys.toSeq) :+ 
SESSION_CATALOG_NAME).distinct.sorted
     pattern.map(StringUtils.filterPattern(allCatalogs, 
_)).getOrElse(allCatalogs)
   }
@@ -463,7 +517,7 @@ class CatalogManager(
   // calls back into [[v1SessionCatalog]]. Test-only callers don't race 
against unqualified
   // function resolution today, but keeping the contract symmetric prevents 
future test
   // helpers (e.g. session reset in a concurrent harness) from reintroducing 
the cycle.
-  private[sql] def reset(): Unit = {
+  override private[sql] def reset(): Unit = {
     synchronized {
       catalogs.clear()
       _currentNamespace = None
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
index 70079357b6dd..e7a0cc73a350 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala
@@ -18,47 +18,95 @@
 package org.apache.spark.sql.connector.catalog
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.catalog.TempVariableManager
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
 import org.apache.spark.sql.connector.catalog.transactions.Transaction
 
 /**
  * A [[CatalogManager]] decorator that redirects catalog lookups to the 
transaction's catalog
  * instance when names match, ensuring table loads during analysis are scoped 
to the transaction.
- * All mutable state (current catalog, current namespace, loaded catalogs) is 
delegated to the
- * wrapped [[CatalogManager]].
+ * All mutable session state is delegated to the wrapped [[CatalogManager]].
  */
-// TODO: Extracting a CatalogManager trait (so this class can implement it 
instead of extending
-//  CatalogManager) would eliminate the inherited mutable state that this 
decorator doesn't use.
 private[sql] class TransactionAwareCatalogManager(
     delegate: CatalogManager,
-    txn: Transaction)
-  extends CatalogManager(delegate.defaultSessionCatalog, 
delegate.v1SessionCatalog) {
+    txn: Transaction) extends CatalogManager {
 
-  override val tempVariableManager: TempVariableManager = 
delegate.tempVariableManager
-
-  override def transaction: Option[Transaction] = Some(txn)
-
-  override def withTransaction(newTxn: Transaction): CatalogManager =
-    throw SparkException.internalError("Cannot nest transactions: a 
transaction is already active.")
+  // ---- Underlying state: pure delegation. ----
+  override def defaultSessionCatalog: CatalogPlugin = 
delegate.defaultSessionCatalog
+  override def v1SessionCatalog: SessionCatalog = delegate.v1SessionCatalog
+  override def tempVariableManager: TempVariableManager = 
delegate.tempVariableManager
 
+  // ---- Catalog access: redirect to txn catalog when names match. ----
   override def catalog(name: String): CatalogPlugin = {
     val resolved = delegate.catalog(name)
     if (txn.catalog.name() == resolved.name()) txn.catalog else resolved
   }
 
+  override private[sql] def v2SessionCatalog: CatalogPlugin = 
delegate.v2SessionCatalog
+
+  override def listCatalogs(pattern: Option[String]): Seq[String] =
+    delegate.listCatalogs(pattern)
+
+  override def transaction: Option[Transaction] = Some(txn)
+
+  override def withTransaction(newTxn: Transaction): CatalogManager =
+    throw SparkException.internalError("Cannot nest transactions: a 
transaction is already active.")
+
   override def currentCatalog: CatalogPlugin = {
     val c = delegate.currentCatalog
     if (txn.catalog.name() == c.name()) txn.catalog else c
   }
 
+  override def setCurrentCatalog(catalogName: String): Unit =
+    delegate.setCurrentCatalog(catalogName)
+
   override def currentNamespace: Array[String] = delegate.currentNamespace
 
   override def setCurrentNamespace(namespace: Array[String]): Unit =
     delegate.setCurrentNamespace(namespace)
 
-  override def setCurrentCatalog(catalogName: String): Unit =
-    delegate.setCurrentCatalog(catalogName)
+  override def sessionPathEntries: Option[Seq[SessionPathEntry]] =
+    delegate.sessionPathEntries
 
-  override def listCatalogs(pattern: Option[String]): Seq[String] =
-    delegate.listCatalogs(pattern)
+  override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
+    delegate.storedSessionPathEntries
+
+  override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] =
+    delegate.confDefaultPathEntries
+
+  override def setSessionPath(entries: Seq[SessionPathEntry]): Unit =
+    delegate.setSessionPath(entries)
+
+  override def clearSessionPath(): Unit = delegate.clearSessionPath()
+
+  override private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
+    delegate.copySessionPathFrom(other)
+
+  override def currentPathString: String = delegate.currentPathString
+
+  override def sqlResolutionPathEntries(
+      pathDefaultCatalog: String,
+      pathDefaultNamespace: Seq[String],
+      expandCatalog: String,
+      expandNamespace: Seq[String]): Seq[Seq[String]] =
+    delegate.sqlResolutionPathEntries(
+      pathDefaultCatalog, pathDefaultNamespace, expandCatalog, expandNamespace)
+
+  override def sqlResolutionPathEntries(
+      currentCatalog: String,
+      currentNamespace: Seq[String]): Seq[Seq[String]] =
+    delegate.sqlResolutionPathEntries(currentCatalog, currentNamespace)
+
+  override def isSystemSessionOnPath: Boolean = delegate.isSystemSessionOnPath
+
+  override def resolutionPathEntriesForAnalysis(
+      pinnedEntries: Option[Seq[Seq[String]]],
+      viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] =
+    delegate.resolutionPathEntriesForAnalysis(pinnedEntries, 
viewCatalogAndNamespace)
+
+  override def sessionFunctionKindsForUnqualifiedResolution(): 
Seq[SessionFunctionKind] =
+    delegate.sessionFunctionKindsForUnqualifiedResolution()
+
+  override private[sql] def reset(): Unit = delegate.reset()
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
index 65ab822ec841..d3dab281a30c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala
@@ -22,7 +22,7 @@ import 
org.apache.spark.sql.catalyst.analysis.resolver.{ResolverExtension, TreeN
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.{CatalogManager, 
DefaultCatalogManager}
 
 /**
  * Verifies that [[Analyzer.withCatalogManager]] propagates all extension 
points.
@@ -49,7 +49,7 @@ class AnalyzerExtensionPropagationSuite extends SparkFunSuite 
{
   }
 
   private def newCatalogManager(): CatalogManager =
-    new CatalogManager(
+    new DefaultCatalogManager(
       FakeV2SessionCatalog,
       new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))
 
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
index 183d37f972a2..93906c756570 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/LookupFunctionsSuite.scala
@@ -25,7 +25,7 @@ import 
org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, InMemoryCatalog,
 import org.apache.spark.sql.catalyst.expressions.{Alias, ExpressionInfo}
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.connector.catalog.{CatalogManager, 
FunctionCatalog, Identifier}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, 
DefaultCatalogManager, FunctionCatalog, Identifier}
 import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -45,7 +45,7 @@ class LookupFunctionsSuite extends PlanTest {
           CatalogDatabase("db1", "", new URI("loc2"), Map.empty),
           ignoreIfExists = false)
         val catalog = new SessionCatalog(externalCatalog, new 
SimpleFunctionRegistry)
-        val catalogManager = new CatalogManager(new 
CustomV2SessionCatalog(catalog), catalog)
+        val catalogManager = new DefaultCatalogManager(new 
CustomV2SessionCatalog(catalog), catalog)
         catalogManager.setCurrentNamespace(Array("db1"))
         try {
           val analyzer = new Analyzer(catalogManager)
@@ -75,7 +75,7 @@ class LookupFunctionsSuite extends PlanTest {
   test("SPARK-23486: the getFunction for the Persistent function check") {
     val externalCatalog = new CustomInMemoryCatalog
     val catalog = new SessionCatalog(externalCatalog, 
FunctionRegistry.builtin.clone())
-    val catalogManager = new CatalogManager(new 
CustomV2SessionCatalog(catalog), catalog)
+    val catalogManager = new DefaultCatalogManager(new 
CustomV2SessionCatalog(catalog), catalog)
     val analyzer = {
       catalog.createDatabase(
         CatalogDatabase("default", "", new URI("loc"), Map.empty),
@@ -100,7 +100,7 @@ class LookupFunctionsSuite extends PlanTest {
     val externalCatalog = new InMemoryCatalog
     val customerFunctionReg = new CustomerFunctionRegistry
     val catalog = new SessionCatalog(externalCatalog, customerFunctionReg)
-    val catalogManager = new CatalogManager(new 
CustomV2SessionCatalog(catalog), catalog)
+    val catalogManager = new DefaultCatalogManager(new 
CustomV2SessionCatalog(catalog), catalog)
     val analyzer = {
       catalog.createDatabase(
         CatalogDatabase("default", "", new URI("loc"), Map.empty),
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
index 09eb1f586421..56ee9a36c778 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizerStructuralIntegrityCheckerSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.PlanTest
 import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, 
LocalRelation, LogicalPlan, OneRowRelation, Project}
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.DefaultCatalogManager
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{BooleanType, StringType, StructType}
 
@@ -57,7 +57,7 @@ class OptimizerStructuralIntegrityCheckerSuite extends 
PlanTest {
   }
 
   object Optimize extends Optimizer(
-    new CatalogManager(
+    new DefaultCatalogManager(
       FakeV2SessionCatalog,
       new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))) {
     val newBatch = Batch("OptimizeRuleBreakSI", Once, OptimizeRuleBreakSI)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
index 007a2b3fd058..b36cfb930c56 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/SimpleTestOptimizer.scala
@@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.optimizer
 
 import org.apache.spark.sql.catalyst.analysis.{EmptyFunctionRegistry, 
EmptyTableFunctionRegistry, FakeV2SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.DefaultCatalogManager
 
 /**
  * An optimizer used in test code.
@@ -29,6 +29,6 @@ import org.apache.spark.sql.connector.catalog.CatalogManager
 object SimpleTestOptimizer extends SimpleTestOptimizer
 
 class SimpleTestOptimizer extends Optimizer(
-  new CatalogManager(
+  new DefaultCatalogManager(
     FakeV2SessionCatalog,
     new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry, 
EmptyTableFunctionRegistry)))
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
index 64b2ac91fbd6..199e43d39bbe 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/CatalogManagerSuite.scala
@@ -41,7 +41,7 @@ class CatalogManagerSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("CatalogManager should reflect the changes of default catalog") {
-    val catalogManager = new CatalogManager(FakeV2SessionCatalog, 
createSessionCatalog())
+    val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog, 
createSessionCatalog())
     assert(catalogManager.currentCatalog.name() == 
CatalogManager.SESSION_CATALOG_NAME)
     assert(catalogManager.currentNamespace.sameElements(Array("default")))
 
@@ -54,7 +54,7 @@ class CatalogManagerSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("CatalogManager should keep the current catalog once set") {
-    val catalogManager = new CatalogManager(FakeV2SessionCatalog, 
createSessionCatalog())
+    val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog, 
createSessionCatalog())
     assert(catalogManager.currentCatalog.name() == 
CatalogManager.SESSION_CATALOG_NAME)
     withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) {
       catalogManager.setCurrentCatalog("dummy")
@@ -70,7 +70,7 @@ class CatalogManagerSuite extends SparkFunSuite with 
SQLHelper {
   }
 
   test("current namespace should be updated when switching current catalog") {
-    val catalogManager = new CatalogManager(FakeV2SessionCatalog, 
createSessionCatalog())
+    val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog, 
createSessionCatalog())
     withSQLConf("spark.sql.catalog.dummy" -> classOf[DummyCatalog].getName) {
       catalogManager.setCurrentCatalog("dummy")
       assert(catalogManager.currentNamespace.sameElements(Array("a", "b")))
@@ -95,7 +95,7 @@ class CatalogManagerSuite extends SparkFunSuite with 
SQLHelper {
       CatalogDatabase(
         "test", "", v1SessionCatalog.getDefaultDBPath("test"), Map.empty),
       ignoreIfExists = false)
-    val catalogManager = new CatalogManager(FakeV2SessionCatalog, 
v1SessionCatalog)
+    val catalogManager = new DefaultCatalogManager(FakeV2SessionCatalog, 
v1SessionCatalog)
 
     // If the current catalog is session catalog, setting current namespace 
actually sets
     // `SessionCatalog.currentDb`.
diff --git 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
index 8cfa219d370c..77e3730d72b9 100644
--- 
a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
+++ 
b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/ProtoToParsedPlanTestSuite.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.connect.config.Connect
 import org.apache.spark.sql.connect.planner.SparkConnectPlanner
-import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, 
Identifier, InMemoryChangelogCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, 
DefaultCatalogManager, Identifier, InMemoryChangelogCatalog}
 import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
@@ -160,7 +160,7 @@ class ProtoToParsedPlanTestSuite extends SharedSparkSession 
with ResourceHelper
       Array.empty[Transform],
       emptyProps)
 
-    val catalogManager = new CatalogManager(
+    val catalogManager = new DefaultCatalogManager(
       inMemoryCatalog,
       new SessionCatalog(
         new catalog.InMemoryCatalog(),
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index c82651595bc5..4838e6d60f72 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.classic.{SparkSession, Strategy, 
StreamingCheckpointManager, StreamingQueryManager, UDFRegistration}
-import org.apache.spark.sql.connector.catalog.CatalogManager
+import org.apache.spark.sql.connector.catalog.DefaultCatalogManager
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.execution.{ColumnarRule, CommandExecutionMode, 
QueryExecution, SparkOptimizer, SparkPlanner, SparkSqlParser}
 import org.apache.spark.sql.execution.adaptive.AdaptiveRulesHolder
@@ -163,7 +163,7 @@ abstract class BaseSessionStateBuilder(
   protected lazy val v2SessionCatalog = new V2SessionCatalog(catalog)
 
   protected lazy val catalogManager = {
-    val cm = new CatalogManager(v2SessionCatalog, catalog)
+    val cm = new DefaultCatalogManager(v2SessionCatalog, catalog)
     parentState.foreach(ps => cm.copySessionPathFrom(ps.catalogManager))
     cm
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to