andreaschat-db commented on code in PR #55646:
URL: https://github.com/apache/spark/pull/55646#discussion_r3280775988
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala:
##########
@@ -18,47 +18,95 @@
package org.apache.spark.sql.connector.catalog
import org.apache.spark.SparkException
-import org.apache.spark.sql.catalyst.catalog.TempVariableManager
+import org.apache.spark.sql.catalyst.catalog.{SessionCatalog,
TempVariableManager}
+import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
+import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
/**
* A [[CatalogManager]] decorator that redirects catalog lookups to the
transaction's catalog
* instance when names match, ensuring table loads during analysis are scoped
to the transaction.
- * All mutable state (current catalog, current namespace, loaded catalogs) is
delegated to the
- * wrapped [[CatalogManager]].
+ * All mutable session state is delegated to the wrapped [[CatalogManager]].
*/
-// TODO: Extracting a CatalogManager trait (so this class can implement it
instead of extending
-// CatalogManager) would eliminate the inherited mutable state that this
decorator doesn't use.
private[sql] class TransactionAwareCatalogManager(
delegate: CatalogManager,
- txn: Transaction)
- extends CatalogManager(delegate.defaultSessionCatalog,
delegate.v1SessionCatalog) {
+ txn: Transaction) extends CatalogManager {
- override val tempVariableManager: TempVariableManager =
delegate.tempVariableManager
-
- override def transaction: Option[Transaction] = Some(txn)
-
- override def withTransaction(newTxn: Transaction): CatalogManager =
- throw SparkException.internalError("Cannot nest transactions: a
transaction is already active.")
+ // ---- Underlying state: pure delegation. ----
+ override def defaultSessionCatalog: CatalogPlugin =
delegate.defaultSessionCatalog
+ override def v1SessionCatalog: SessionCatalog = delegate.v1SessionCatalog
+ override def tempVariableManager: TempVariableManager =
delegate.tempVariableManager
+ // ---- Catalog access: redirect to txn catalog when names match. ----
override def catalog(name: String): CatalogPlugin = {
val resolved = delegate.catalog(name)
if (txn.catalog.name() == resolved.name()) txn.catalog else resolved
}
+ override private[sql] def v2SessionCatalog: CatalogPlugin =
delegate.v2SessionCatalog
+
+ override def listCatalogs(pattern: Option[String]): Seq[String] =
+ delegate.listCatalogs(pattern)
+
+ override def transaction: Option[Transaction] = Some(txn)
+
+ override def withTransaction(newTxn: Transaction): CatalogManager =
+ throw SparkException.internalError("Cannot nest transactions: a
transaction is already active.")
+
override def currentCatalog: CatalogPlugin = {
val c = delegate.currentCatalog
if (txn.catalog.name() == c.name()) txn.catalog else c
}
+ override def setCurrentCatalog(catalogName: String): Unit =
+ delegate.setCurrentCatalog(catalogName)
+
override def currentNamespace: Array[String] = delegate.currentNamespace
override def setCurrentNamespace(namespace: Array[String]): Unit =
delegate.setCurrentNamespace(namespace)
- override def setCurrentCatalog(catalogName: String): Unit =
- delegate.setCurrentCatalog(catalogName)
+ override def sessionPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.sessionPathEntries
- override def listCatalogs(pattern: Option[String]): Seq[String] =
- delegate.listCatalogs(pattern)
+ override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.storedSessionPathEntries
+
+ override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] =
+ delegate.confDefaultPathEntries
+
+ override def setSessionPath(entries: Seq[SessionPathEntry]): Unit =
+ delegate.setSessionPath(entries)
+
+ override def clearSessionPath(): Unit = delegate.clearSessionPath()
+
+ override private[sql] def copySessionPathFrom(other: CatalogManager): Unit =
+ delegate.copySessionPathFrom(other)
+
+ override def currentPathString: String = delegate.currentPathString
+
+ override def sqlResolutionPathEntries(
+ pathDefaultCatalog: String,
+ pathDefaultNamespace: Seq[String],
+ expandCatalog: String,
+ expandNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.sqlResolutionPathEntries(
+ pathDefaultCatalog, pathDefaultNamespace, expandCatalog, expandNamespace)
+
+ override def sqlResolutionPathEntries(
+ currentCatalog: String,
+ currentNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.sqlResolutionPathEntries(currentCatalog, currentNamespace)
+
+ override def isSystemSessionOnPath: Boolean = delegate.isSystemSessionOnPath
+
+ override def resolutionPathEntriesForAnalysis(
+ pinnedEntries: Option[Seq[Seq[String]]],
+ viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] =
+ delegate.resolutionPathEntriesForAnalysis(pinnedEntries,
viewCatalogAndNamespace)
+
+ override def sessionFunctionKindsForUnqualifiedResolution():
Seq[SessionFunctionKind] =
+ delegate.sessionFunctionKindsForUnqualifiedResolution()
+
+ override private[sql] def reset(): Unit = delegate.reset()
Review Comment:
I would omit the comment because the base behaviour was only introduced a
few days ago. This PR is a direct follow up to fix issues of the base PR.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]