andreaschat-db commented on code in PR #55278:
URL: https://github.com/apache/spark/pull/55278#discussion_r3159221205


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1005,9 +1031,10 @@ class Analyzer(
       }
     }
 
-    // Resolve V2TableReference nodes in a plan. V2TableReference is only 
created for temp views
-    // (via V2TableReference.createForTempView), so we only need to resolve it 
when returning
-    // the plan of temp views (in resolveViews and unwrapRelationPlan).
+    // Resolve V2TableReference nodes created for:
+    // 1 Temp views (via createForTempView).
+    // 2. Transaction references (via createForTransaction). These are 
resolved by a
+    // separate analysis batch in the transaction-aware analyzer instance.
     private def resolveTableReferences(plan: LogicalPlan): LogicalPlan = {

Review Comment:
   The comment was wrong here. The V2TableReference resolution for transactions 
happens in line 1024. Renamed the function and fixed comment.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -336,6 +336,30 @@ class Analyzer(
     }
   }
 
+  /**
+   * Returns a copy of this analyzer that uses the given [[CatalogManager]] 
for all catalog
+   * lookups. All other configuration (extended rules, checks, etc.) is 
preserved. Used by
+   * [[QueryExecution]] to create a per-query analyzer for transactional 
operations for
+   * transaction-aware catalog resolution.
+   */
+  def withCatalogManager(newCatalogManager: CatalogManager): Analyzer = {

Review Comment:
   Yes it is a hard requirement because otherwise we will silently drop all 
registered extensions. In general, anyone who adds a new extension needs to 
also amend this method. The method is defined in the Analyzer itself so it 
should not be that easy to miss.
   
   In any case, just to be sure, I added a new suite/test, i.e.  
AnalyzerExtensionPropagationSuite, where I use reflection to verify all known 
extensions. If anyone adds a new extension the test will fail and point to 
`withCatalogManager`.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala:
##########
@@ -403,9 +408,18 @@ class RelationResolution(
   }
 
   private def loadRelation(ref: V2TableReference): LogicalPlan = {
-    val table = ref.catalog.loadTable(ref.identifier)
+    // Resolve catalog. When a transaction is active we return the transaction

Review Comment:
   Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala:
##########
@@ -403,9 +408,18 @@ class RelationResolution(
   }
 
   private def loadRelation(ref: V2TableReference): LogicalPlan = {
-    val table = ref.catalog.loadTable(ref.identifier)
+    // Resolve catalog. When a transaction is active we return the transaction
+    // aware catalog instance.
+    val resolvedCatalog = 
catalogManager.catalog(ref.catalog.name).asTableCatalog
+    val table = resolvedCatalog.loadTable(ref.identifier)
     V2TableReferenceUtils.validateLoadedTable(table, ref)
-    ref.toRelation(table)
+    // Create relation with resolved Catalog.

Review Comment:
   Removed.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveTransactionRelations.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
TransactionalWrite}
+import 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
LookupCatalog}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+/**
+ * When a transaction is active, converts resolved [[DataSourceV2Relation]] 
nodes back to
+ * [[V2TableReference]] placeholders for all relations loaded by a catalog 
with the same
+ * name as the transaction catalog.
+ *
+ * This forces re-resolution of those relations against the transaction's 
catalog, which
+ * intercepts [[TableCatalog#loadTable]] calls to track which tables are read 
as part of
+ * the transaction.
+ */
+class UnresolveTransactionRelations(val catalogManager: CatalogManager)

Review Comment:
   Yes better. Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/V2TableReference.scala:
##########
@@ -110,11 +119,32 @@ private[sql] object V2TableReferenceUtils extends 
SQLConfHelper {
     ref.context match {
       case ctx: TemporaryViewContext =>
         validateLoadedTableInTempView(table, ref, ctx)
+      case TransactionContext =>
+        validateLoadedTableInTransaction(table, ref)
       case ctx =>
         throw SparkException.internalError(s"Unknown table ref context: 
${ctx.getClass.getName}")
     }
   }
 
+  private def validateLoadedTableInTransaction(table: Table, ref: 
V2TableReference): Unit = {

Review Comment:
   Added validation for tables ID and relevant test. There are already test 
covering `validateCapturedColumns`. We are currently missing tests that verify 
`metadataColumnsChangedAfterAnalysis`. We need to expand testing infra to 
support that.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -998,8 +1002,13 @@ case class MergeIntoTable(
     notMatchedActions: Seq[MergeAction],
     notMatchedBySourceActions: Seq[MergeAction],
     withSchemaEvolution: Boolean)
-    extends BinaryCommand with WriteWithSchemaEvolution with SupportsSubquery {
+    extends BinaryCommand
+    with WriteWithSchemaEvolution
+    with SupportsSubquery
+    with TransactionalWrite {
 
+  // Implements SupportsSchemaEvolution.table.

Review Comment:
   This was added by the schema evolution work. `SupportsSchemaEvolution` was 
later renamed to `WriteWithSchemaEvolution`. Corrected the stale comment. 
Target gives a handle to target to WriteWithSchemaEvolution so it can perform 
the evolution magic.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/CatalogManager.scala:
##########
@@ -57,6 +58,11 @@ class CatalogManager(
     }
   }
 
+  def transaction: Option[Transaction] = None
+
+  def withTransaction(transaction: Transaction): CatalogManager =

Review Comment:
   Yes great idea. Done.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/transactions/TransactionUtils.scala:
##########
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.transactions
+
+import java.util.UUID
+
+import org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin
+import org.apache.spark.sql.connector.catalog.transactions.{Transaction, 
TransactionInfoImpl}
+import org.apache.spark.util.Utils
+
+object TransactionUtils {
+  def commit(transaction: Transaction): Unit = {
+    Utils.tryWithSafeFinally {
+      transaction.commit()
+    } {
+      transaction.close()
+    }
+  }
+
+  def abort(transaction: Transaction): Unit = {
+    Utils.tryWithSafeFinally {
+      transaction.abort()
+    } {
+      transaction.close()
+    }
+  }
+
+  def beginTransaction(catalog: TransactionalCatalogPlugin): Transaction = {
+    val info = TransactionInfoImpl(id = UUID.randomUUID.toString)
+    val transaction = catalog.beginTransaction(info)
+    if (transaction.catalog.name != catalog.name) {
+      abort(transaction)
+      throw new IllegalStateException(

Review Comment:
   Replaced it with `SparkException.internalError`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -79,6 +82,8 @@ class QueryExecution(
   // TODO: Move the planner an optimizer into here from SessionState.
   protected def planner = sparkSession.sessionState.planner
 
+  protected val catalogManager = sparkSession.sessionState.catalogManager

Review Comment:
   Expanded functionality to cover streaming. Added new suites for SQLScripting 
and Streaming.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/RelationResolution.scala:
##########
@@ -391,6 +391,11 @@ class RelationResolution(
   }
 
   private def getOrLoadRelation(ref: V2TableReference): LogicalPlan = {
+    // Skip cache when a transaction is active.

Review Comment:
   That was only meant as a temporary limitation. It was a conservative 
approach until the interaction of the cache with the transaction is understood. 
I looked at it. My understanding is the following:
   
   The `AnalysisContext.get.relationCache` is strictly scoped to a single 
`Analyzer.execute` call. So there is no interference outside the transaction 
and vice-versa. All tables participating in the transaction are resolved within 
the transaction. Therefore, all cache entries should hold transactional tables. 
Removed the guard.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnresolveRelationsInTransaction.scala:
##########
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, 
TransactionalWrite}
+import 
org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.allowInvokingTransformsInAnalyzer
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.{CatalogManager, CatalogPlugin, 
LookupCatalog}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+
+/**
+ * When a transaction is active, converts resolved [[DataSourceV2Relation]] 
nodes back to
+ * [[V2TableReference]] placeholders for all relations loaded by a catalog 
with the same
+ * name as the transaction catalog.
+ *
+ * This forces re-resolution of those relations against the transaction's 
catalog, which
+ * intercepts [[TableCatalog#loadTable]] calls to track which tables are read 
as part of
+ * the transaction.
+ */
+class UnresolveTransactionRelations(val catalogManager: CatalogManager)
+  extends Rule[LogicalPlan] with LookupCatalog {
+
+  override def apply(plan: LogicalPlan): LogicalPlan =
+    catalogManager.transaction match {
+      case Some(transaction) =>
+        allowInvokingTransformsInAnalyzer {

Review Comment:
   Yes resolveOperators skips subtrees that have already being marked as 
analyzed. Furthermore, `allowInvokingTransformsInAnalyzer` allows to suppress 
the `assertNotAnalysisRule` safety check, which forbids calling transform 
directly inside the analyzer when not within a resolveOperators call.
   
   Added comment.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/connector/catalog/TransactionAwareCatalogManager.scala:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.connector.catalog
+
+import org.apache.spark.sql.catalyst.catalog.TempVariableManager
+import org.apache.spark.sql.connector.catalog.transactions.Transaction
+
+/**
+ * A [[CatalogManager]] decorator that redirects catalog lookups to the 
transaction's catalog
+ * instance when names match, ensuring table loads during analysis are scoped 
to the transaction.
+ * All mutable state (current catalog, current namespace, loaded catalogs) is 
delegated to the
+ * wrapped [[CatalogManager]].
+ */
+// TODO: Extracting a CatalogManager trait (so this class can implement it 
instead of extending

Review Comment:
   It is not that bad but I left it out because it would pollute the PR. It can 
be done in a small follow up PR.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statements.scala:
##########
@@ -188,7 +188,11 @@ case class InsertIntoStatement(
     byName: Boolean = false,
     replaceCriteriaOpt: Option[InsertReplaceCriteria] = None,
     withSchemaEvolution: Boolean = false)
-  extends UnaryParsedStatement {
+  // Extends TransactionalWrite so that QueryExecution can detect a potential 
transaction on the
+  // unresolved logical plan before analysis runs. InsertIntoStatement is 
shared between V1 and V2
+  // inserts, but the LookupCatalog.TransactionalWrite extractor only matches 
when the target
+  // catalog implements TransactionalCatalogPlugin, so V1 inserts are never 
assigned a transaction.
+  extends UnaryParsedStatement with TransactionalWrite {

Review Comment:
   Unfortunate indeed. I do not like at all that we are mixing v1 with v2 here 
but I thought refactoring this is out of the context of this PR. 
   
   > Am I right that we sometimes create AppendData directly and some times go 
via statement?
   
   Yes. Dataframe API creates `AppendData`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala:
##########
@@ -90,6 +95,46 @@ class QueryExecution(
     
logical.exists(_.expressions.exists(_.exists(_.isInstanceOf[LazyExpression])))
   }
 
+
+  // 1. At the pre-Analyzed plan we look for nodes that implement the 
TransactionalWrite trait.
+  //    When a plan contains such a node we initiate a transaction. Note, we 
should never start
+  //    a transaction for operations that are not executed, e.g. EXPLAIN.
+  // 2. Create an analyzer clone with a transaction aware Catalog Manager. The 
latter is the
+  //    narrow waist of all catalog accesses, and it is also the transaction 
context carrier.
+  //    This is then passed to all rules during analysis that need to check 
the catalog. Rules
+  //    that are specifically interested in transactionality can access the 
transaction directly
+  //    from the Catalog Manager. The transaction catalog, is potentially the 
place where connectors
+  //    should keep state about the reads (tables+predicates) that occurred 
during the transaction.
+  // 3. The analyzer instance is passed to nested Query Execution instances. 
These need to respect
+  //    the open transaction instead of creating their own.
+  private lazy val transactionOpt: Option[Transaction] =
+    // Always inherit an active transaction from the outer analyzer, 
regardless of mode.
+    analyzerOpt.flatMap(_.catalogManager.transaction).orElse {
+      // Only begin a new transaction for outer QEs that lead to execution.
+      if (mode != CommandExecutionMode.SKIP) {
+        val catalog = logical match {
+          case UnresolvedWith(TransactionalWrite(c), _, _) => Some(c)
+          case TransactionalWrite(c) => Some(c)
+          case _ => None
+        }
+        catalog.map(TransactionUtils.beginTransaction)
+      } else {
+        None
+      }
+    }
+
+  // Per-query analyzer: uses a transaction-aware CatalogManager when a 
transaction is active,
+  // so that all catalog lookups and rule applications during analysis see the 
correct state
+  // without relying on thread-local context.
+  private lazy val analyzer: Analyzer = analyzerOpt.getOrElse {
+    transactionOpt match {
+      case Some(txn) =>
+        
sparkSession.sessionState.analyzer.withCatalogManager(catalogManager.withTransaction(txn))
+      case None =>
+        sparkSession.sessionState.analyzer

Review Comment:
   External callers cannot access it because it is private. The only issue I 
see is transactional writes that create internal QE not propagating the 
transactional analyzer instance to the new QE instance. I added a new comment 
above `analyzerOpt` to stress this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to