aokolnychyi commented on code in PR #55646:
URL: https://github.com/apache/spark/pull/55646#discussion_r3275656555
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -33,21 +33,86 @@ import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
/**
- * A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered
catalogs, and allow
- * the caller to look up a catalog by name.
+ * A thread-safe contract for managing [[CatalogPlugin]]s. Implementations
resolve catalogs by
+ * name and maintain the current catalog and namespace for a session.
*
* There are still many commands (e.g. ANALYZE TABLE) that do not support v2
catalog API. They
* ignore the current catalog and blindly go to the v1 `SessionCatalog`. To
avoid tracking current
- * namespace in both `SessionCatalog` and `CatalogManger`, we let
`CatalogManager` to set/get
+ * namespace in both `SessionCatalog` and `CatalogManager`, implementations
set/get the
* current database of `SessionCatalog` when the current catalog is the
session catalog.
+ *
+ * Two implementations exist: [[DefaultCatalogManager]] owns the mutable
session state;
+ * [[TransactionAwareCatalogManager]] wraps another manager and redirects
catalog lookups to the
+ * active transaction's catalog.
*/
// TODO: all commands should look up table from the current catalog. The
`SessionCatalog` doesn't
// need to track current database at all.
-private[sql]
-class CatalogManager(
+private[sql] trait CatalogManager extends SQLConfHelper with Logging {
+
+ // ---- Underlying state exposed by implementations ----
+ def defaultSessionCatalog: CatalogPlugin
+ def v1SessionCatalog: SessionCatalog
+ def tempVariableManager: TempVariableManager
+
+ // ---- Catalog access ----
+ def catalog(name: String): CatalogPlugin
+ private[sql] def v2SessionCatalog: CatalogPlugin
+ def listCatalogs(pattern: Option[String]): Seq[String]
+ def currentCatalog: CatalogPlugin
+ def setCurrentCatalog(catalogName: String): Unit
+ def isCatalogRegistered(name: String): Boolean = {
+ try {
+ catalog(name)
+ true
+ } catch {
+ case _: CatalogNotFoundException => false
+ }
+ }
+
+ // ---- Transactions ----
+ def transaction: Option[Transaction] = None
+
+ def withTransaction(transaction: Transaction): CatalogManager =
+ new TransactionAwareCatalogManager(this, transaction)
+
+ // ---- Namespace ----
+ def currentNamespace: Array[String]
+ def setCurrentNamespace(namespace: Array[String]): Unit
+
+ // ---- Session path ----
Review Comment:
@cloud-fan, could you help review this, please?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]