andreaschat-db commented on code in PR #55646:
URL: https://github.com/apache/spark/pull/55646#discussion_r3280766608


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -26,27 +26,93 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.SQLConfHelper
 import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, 
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.StringUtils
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
 import org.apache.spark.sql.connector.catalog.transactions.Transaction
 import org.apache.spark.sql.errors.QueryCompilationErrors
 import org.apache.spark.sql.internal.SQLConf
 
 /**
- * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered 
catalogs, and allow
- * the caller to look up a catalog by name.
+ * A thread-safe contract for managing [[CatalogPlugin]]s. Implementations 
resolve catalogs by
+ * name and maintain the current catalog and namespace for a session.
  *
  * There are still many commands (e.g. ANALYZE TABLE) that do not support v2 
catalog API. They
  * ignore the current catalog and blindly go to the v1 `SessionCatalog`. To 
avoid tracking current
- * namespace in both `SessionCatalog` and `CatalogManger`, we let 
`CatalogManager` to set/get
+ * namespace in both `SessionCatalog` and `CatalogManager`, implementations 
set/get the
  * current database of `SessionCatalog` when the current catalog is the 
session catalog.
+ *
+ * Two implementations exist: [[DefaultCatalogManager]] owns the mutable 
session state;
+ * [[TransactionAwareCatalogManager]] wraps another manager and redirects 
catalog lookups to the
+ * active transaction's catalog.
  */
 // TODO: all commands should look up table from the current catalog. The 
`SessionCatalog` doesn't
 //       need to track current database at all.
-private[sql]
-class CatalogManager(
+private[sql] trait CatalogManager extends SQLConfHelper with Logging {
+
+  // ---- Underlying state exposed by implementations ----
+  def defaultSessionCatalog: CatalogPlugin
+  def v1SessionCatalog: SessionCatalog
+  def tempVariableManager: TempVariableManager
+
+  // ---- Catalog access ----
+  def catalog(name: String): CatalogPlugin
+  private[sql] def v2SessionCatalog: CatalogPlugin
+  def listCatalogs(pattern: Option[String]): Seq[String]
+  def currentCatalog: CatalogPlugin
+  def setCurrentCatalog(catalogName: String): Unit
+  def isCatalogRegistered(name: String): Boolean = {
+    try {
+      catalog(name)
+      true
+    } catch {
+      case _: CatalogNotFoundException => false
+    }
+  }
+
+  // ---- Transactions ----
+  def transaction: Option[Transaction] = None
+
+  def withTransaction(transaction: Transaction): CatalogManager
+
+  // ---- Namespace ----
+  def currentNamespace: Array[String]
+  def setCurrentNamespace(namespace: Array[String]): Unit
+
+  // ---- Session path ----
+  def sessionPathEntries: Option[Seq[SessionPathEntry]]
+  def storedSessionPathEntries: Option[Seq[SessionPathEntry]]
+  def confDefaultPathEntries: Option[Seq[SessionPathEntry]]
+  def setSessionPath(entries: Seq[SessionPathEntry]): Unit
+  def clearSessionPath(): Unit
+  private[sql] def copySessionPathFrom(other: CatalogManager): Unit
+  def currentPathString: String
+  def sqlResolutionPathEntries(
+      pathDefaultCatalog: String,
+      pathDefaultNamespace: Seq[String],
+      expandCatalog: String,
+      expandNamespace: Seq[String]): Seq[Seq[String]]
+  def sqlResolutionPathEntries(
+      currentCatalog: String,
+      currentNamespace: Seq[String]): Seq[Seq[String]]
+  def isSystemSessionOnPath: Boolean
+  def resolutionPathEntriesForAnalysis(
+      pinnedEntries: Option[Seq[Seq[String]]],
+      viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]]
+  def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind]
+
+  // Reset the manager to its initial state. Only used in tests.
+  private[sql] def reset(): Unit
+}
+
+/**
+ * Default [[CatalogManager]] implementation. Owns the mutable session state
+ * (registered catalogs, current catalog/namespace, session path).
+ */
+private[sql] class DefaultCatalogManager(
     val defaultSessionCatalog: CatalogPlugin,
-    val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
+    val v1SessionCatalog: SessionCatalog) extends CatalogManager {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to