andreaschat-db commented on code in PR #55278:
URL: https://github.com/apache/spark/pull/55278#discussion_r3159221205
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1005,9 +1031,10 @@ class Analyzer(
}
}
- // Resolve V2TableReference nodes in a plan. V2TableReference is only
created for temp views
- // (via V2TableReference.createForTempView), so we only need to resolve it
when returning
- // the plan of temp views (in resolveViews and unwrapRelationPlan).
+ // Resolve V2TableReference nodes created for:
+ // 1 Temp views (via createForTempView).
+ // 2. Transaction references (via createForTransaction). These are
resolved by a
+ // separate analysis batch in the transaction-aware analyzer instance.
private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = {
Review Comment:
The comment was wrong here. The V2TableReference resolution for transactions
happens in line 1024. Renamed the function and fixed comment.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -336,6 +336,30 @@ class Analyzer(
}
}
+ /**
+ * Returns a copy of this analyzer that uses the given [[CatalogManager]]
for all catalog
+ * lookups. All other configuration (extended rules, checks, etc.) is
preserved. Used by
+ * [[QueryExecution]] to create a per-query analyzer for transactional
operations for
+ * transaction-aware catalog resolution.
+ */
+ def withCatalogManager(newCatalogManager: CatalogManager): Analyzer = {
Review Comment:
Yes it is a hard requirement because otherwise we will silently drop all
registered extensions. In general, anyone who adds a new extension needs to
also amend this method. The method is defined in the Analyzer itself so it
should not be that easy to miss.
In any case, just to be sure, I added a new suite/test, i.e.
AnalyzerExtensionPropagationSuite, where I use reflection to verify all known
extensions. If anyone adds a new extension the test will fail and point to
`withCatalogManager`.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala:
##########
@@ -403,9 +408,18 @@ class RelationResolution(
}
private def loadRelation(ref: V2TableReference): LogicalPlan = {
- val table = ref.catalog.loadTable(ref.identifier)
+ // Resolve catalog. When a transaction is active we return the transaction
Review Comment:
Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala:
##########
@@ -403,9 +408,18 @@ class RelationResolution(
}
private def loadRelation(ref: V2TableReference): LogicalPlan = {
- val table = ref.catalog.loadTable(ref.identifier)
+ // Resolve catalog. When a transaction is active we return the transaction
+ // aware catalog instance.
+ val resolvedCatalog =
catalogManager.catalog(ref.catalog.name).asTableCatalog
+ val table = resolvedCatalog.loadTable(ref.identifier)
V2TableReferenceUtils.validateLoadedTable(table, ref)
- ref.toRelation(table)
+ // Create relation with resolved Catalog.
Review Comment:
Removed.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveTransactionRelations.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
TransactionalWrite}
+import
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
LookupCatalog}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+/**
+ * When a transaction is active, converts resolved [[DataSourceV2Relation]]
nodes back to
+ * [[V2TableReference]] placeholders for all relations loaded by a catalog
with the same
+ * name as the transaction catalog.
+ *
+ * This forces re-resolution of those relations against the transaction's
catalog, which
+ * intercepts [[TableCatalog#loadTable]] calls to track which tables are read
as part of
+ * the transaction.
+ */
+class UnresolveTransactionRelations(val catalogManager: CatalogManager)
Review Comment:
Yes better. Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala:
##########
@@ -110,11 +119,32 @@ private[sql] object V2TableReferenceUtils extends
SQLConfHelper {
ref.context match {
case ctx: TemporaryViewContext =>
validateLoadedTableInTempView(table, ref, ctx)
+ case TransactionContext =>
+ validateLoadedTableInTransaction(table, ref)
case ctx =>
throw SparkException.internalError(s"Unknown table ref context:
${ctx.getClass.getName}")
}
}
+ private def validateLoadedTableInTransaction(table: Table, ref:
V2TableReference): Unit = {
Review Comment:
Added validation for tables ID and relevant test. There are already test
covering `validateCapturedColumns`. We are currently missing tests that verify
`metadataColumnsChangedAfterAnalysis`. We need to expand testing infra to
support that.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -998,8 +1002,13 @@ case class MergeIntoTable(
notMatchedActions: Seq[MergeAction],
notMatchedBySourceActions: Seq[MergeAction],
withSchemaEvolution: Boolean)
- extends BinaryCommand with WriteWithSchemaEvolution with SupportsSubquery {
+ extends BinaryCommand
+ with WriteWithSchemaEvolution
+ with SupportsSubquery
+ with TransactionalWrite {
+ // Implements SupportsSchemaEvolution.table.
Review Comment:
This was added by the schema evolution work. `SupportsSchemaEvolution` was
later renamed to `WriteWithSchemaEvolution`. Corrected the stale comment.
Target gives a handle to target to WriteWithSchemaEvolution so it can perform
the evolution magic.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -57,6 +58,11 @@ class CatalogManager(
}
}
+ def transaction: Option[Transaction] = None
+
+ def withTransaction(transaction: Transaction): CatalogManager =
Review Comment:
Yes great idea. Done.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtils.scala:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.transactions
+
+import java.util.UUID
+
+import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin
+import org.apache.spark.sql.connector.catalog.transactions.{Transaction,
TransactionInfoImpl}
+import org.apache.spark.util.Utils
+
+object TransactionUtils {
+ def commit(transaction: Transaction): Unit = {
+ Utils.tryWithSafeFinally {
+ transaction.commit()
+ } {
+ transaction.close()
+ }
+ }
+
+ def abort(transaction: Transaction): Unit = {
+ Utils.tryWithSafeFinally {
+ transaction.abort()
+ } {
+ transaction.close()
+ }
+ }
+
+ def beginTransaction(catalog: TransactionalCatalogPlugin): Transaction = {
+ val info = TransactionInfoImpl(id = UUID.randomUUID.toString)
+ val transaction = catalog.beginTransaction(info)
+ if (transaction.catalog.name != catalog.name) {
+ abort(transaction)
+ throw new IllegalStateException(
Review Comment:
Replaced it with `SparkException.internalError`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -79,6 +82,8 @@ class QueryExecution(
// TODO: Move the planner an optimizer into here from SessionState.
protected def planner = sparkSession.sessionState.planner
+ protected val catalogManager = sparkSession.sessionState.catalogManager
Review Comment:
Expanded functionality to cover streaming. Added new suites for SQLScripting
and Streaming.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala:
##########
@@ -391,6 +391,11 @@ class RelationResolution(
}
private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
+ // Skip cache when a transaction is active.
Review Comment:
That was only meant as a temporary limitation. It was a conservative
approach until the interaction of the cache with the transaction is understood.
I looked at it. My understanding is the following:
The `AnalysisContext.get.relationCache` is strictly scoped to a single
`Analyzer.execute` call. So there is no interference outside the transaction
and vice-versa. All tables participating in the transaction are resolved within
the transaction. Therefore, all cache entries should hold transactional tables.
Removed the guard.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan,
TransactionalWrite}
+import
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin,
LookupCatalog}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+/**
+ * When a transaction is active, converts resolved [[DataSourceV2Relation]]
nodes back to
+ * [[V2TableReference]] placeholders for all relations loaded by a catalog
with the same
+ * name as the transaction catalog.
+ *
+ * This forces re-resolution of those relations against the transaction's
catalog, which
+ * intercepts [[TableCatalog#loadTable]] calls to track which tables are read
as part of
+ * the transaction.
+ */
+class UnresolveTransactionRelations(val catalogManager: CatalogManager)
+ extends Rule[LogicalPlan] with LookupCatalog {
+
+ override def apply(plan: LogicalPlan): LogicalPlan =
+ catalogManager.transaction match {
+ case Some(transaction) =>
+ allowInvokingTransformsInAnalyzer {
Review Comment:
Yes resolveOperators skips subtrees that have already being marked as
analyzed. Furthermore, `allowInvokingTransformsInAnalyzer` allows to suppress
the `assertNotAnalysisRule` safety check, which forbids calling transform
directly inside the analyzer when not within a resolveOperators call.
Added comment.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import org.apache.spark.sql.catalyst.catalog.TempVariableManager
+import org.apache.spark.sql.connector.catalog.transactions.Transaction
+
+/**
+ * A [[CatalogManager]] decorator that redirects catalog lookups to the
transaction's catalog
+ * instance when names match, ensuring table loads during analysis are scoped
to the transaction.
+ * All mutable state (current catalog, current namespace, loaded catalogs) is
delegated to the
+ * wrapped [[CatalogManager]].
+ */
+// TODO: Extracting a CatalogManager trait (so this class can implement it
instead of extending
Review Comment:
It is not that bad but I left it out because it would pollute the PR. It can
be done in a small follow up PR.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala:
##########
@@ -188,7 +188,11 @@ case class InsertIntoStatement(
byName: Boolean = false,
replaceCriteriaOpt: Option[InsertReplaceCriteria] = None,
withSchemaEvolution: Boolean = false)
- extends UnaryParsedStatement {
+ // Extends TransactionalWrite so that QueryExecution can detect a potential
transaction on the
+ // unresolved logical plan before analysis runs. InsertIntoStatement is
shared between V1 and V2
+ // inserts, but the LookupCatalog.TransactionalWrite extractor only matches
when the target
+ // catalog implements TransactionalCatalogPlugin, so V1 inserts are never
assigned a transaction.
+ extends UnaryParsedStatement with TransactionalWrite {
Review Comment:
Unfortunate indeed. I do not like at all that we are mixing v1 with v2 here
but I thought refactoring this is out of the context of this PR.
> Am I right that we sometimes create AppendData directly and some times go
via statement?
Yes. Dataframe API creates `AppendData`.
##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -90,6 +95,46 @@ class QueryExecution(
logical.exists(_.expressions.exists(_.exists(_.isInstanceOf[LazyExpression])))
}
+
+ // 1. At the pre-Analyzed plan we look for nodes that implement the
TransactionalWrite trait.
+ // When a plan contains such a node we initiate a transaction. Note, we
should never start
+ // a transaction for operations that are not executed, e.g. EXPLAIN.
+ // 2. Create an analyzer clone with a transaction aware Catalog Manager. The
latter is the
+ // narrow waist of all catalog accesses, and it is also the transaction
context carrier.
+ // This is then passed to all rules during analysis that need to check
the catalog. Rules
+ // that are specifically interested in transactionality can access the
transaction directly
+ // from the Catalog Manager. The transaction catalog, is potentially the
place where connectors
+ // should keep state about the reads (tables+predicates) that occurred
during the transaction.
+ // 3. The analyzer instance is passed to nested Query Execution instances.
These need to respect
+ // the open transaction instead of creating their own.
+ private lazy val transactionOpt: Option[Transaction] =
+ // Always inherit an active transaction from the outer analyzer,
regardless of mode.
+ analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
+ // Only begin a new transaction for outer QEs that lead to execution.
+ if (mode != CommandExecutionMode.SKIP) {
+ val catalog = logical match {
+ case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
+ case TransactionalWrite(c) => Some(c)
+ case _ => None
+ }
+ catalog.map(TransactionUtils.beginTransaction)
+ } else {
+ None
+ }
+ }
+
+ // Per-query analyzer: uses a transaction-aware CatalogManager when a
transaction is active,
+ // so that all catalog lookups and rule applications during analysis see the
correct state
+ // without relying on thread-local context.
+ private lazy val analyzer: Analyzer = analyzerOpt.getOrElse {
+ transactionOpt match {
+ case Some(txn) =>
+
sparkSession.sessionState.analyzer.withCatalogManager(catalogManager.withTransaction(txn))
+ case None =>
+ sparkSession.sessionState.analyzer
Review Comment:
External callers cannot access it because it is private. The only issue I
see is transactional writes that create internal QE not propagating the
transactional analyzer instance to the new QE instance. I added a new comment
above `analyzerOpt` to stress this.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]