andreaschat-db commented on code in PR #55278:
URL: https://github.com/apache/spark/pull/55278#discussion_r3166527139


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala:
##########
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.analysis.resolver.{ResolverExtension, 
TreeNodeResolver}
+import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.connector.catalog.CatalogManager
+
+/**
+ * Verifies that [[Analyzer.withCatalogManager]] propagates all extension 
points.
+ *
+ * If this suite fails with an unexpected method count, a new extension point 
was added to
+ * [[Analyzer.withCatalogManager]] without being verified here. Add the 
corresponding assertion
+ * and update the expected count.
+ *
+ * If [[Analyzer]] gains a new extension point that is NOT yet in 
[[Analyzer.withCatalogManager]],
+ * add it there first, then update this suite.
+ */
+class AnalyzerExtensionPropagationSuite extends SparkFunSuite {
+
+  private val dummyRule: Rule[LogicalPlan] = new Rule[LogicalPlan] {
+    override def apply(plan: LogicalPlan): LogicalPlan = plan
+  }
+
+  private val dummyCheck: LogicalPlan => Unit = (_: LogicalPlan) => ()
+
+  private val dummyExtension: ResolverExtension = new ResolverExtension {
+    override def resolveOperator(
+        operator: LogicalPlan,
+        resolver: TreeNodeResolver[LogicalPlan, LogicalPlan]): 
Option[LogicalPlan] = None
+  }
+
+  private def newCatalogManager(): CatalogManager =
+    new CatalogManager(
+      FakeV2SessionCatalog,
+      new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry))
+
+  test("withCatalogManager propagates all extension points") {
+    val analyzer = new Analyzer(newCatalogManager()) {
+      override val hintResolutionRules: Seq[Rule[LogicalPlan]] = Seq(dummyRule)
+      override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = 
Seq(dummyRule)
+      override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = 
Seq(dummyRule)
+      override val extendedCheckRules: Seq[LogicalPlan => Unit] = 
Seq(dummyCheck)
+      override val singlePassResolverExtensions: Seq[ResolverExtension] = 
Seq(dummyExtension)
+      override val singlePassMetadataResolverExtensions: 
Seq[ResolverExtension] =
+        Seq(dummyExtension)
+      override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] = 
Seq(dummyRule)
+      override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => 
Unit] = Seq(dummyCheck)
+    }
+
+    val clone = analyzer.withCatalogManager(newCatalogManager())
+
+    assert(clone.hintResolutionRules eq analyzer.hintResolutionRules)
+    assert(clone.extendedResolutionRules eq analyzer.extendedResolutionRules)
+    assert(clone.postHocResolutionRules eq analyzer.postHocResolutionRules)
+    assert(clone.extendedCheckRules eq analyzer.extendedCheckRules)
+    assert(clone.singlePassResolverExtensions eq 
analyzer.singlePassResolverExtensions)
+    assert(clone.singlePassMetadataResolverExtensions eq
+      analyzer.singlePassMetadataResolverExtensions)
+    assert(clone.singlePassPostHocResolutionRules eq 
analyzer.singlePassPostHocResolutionRules)
+    assert(clone.singlePassExtendedResolutionChecks eq 
analyzer.singlePassExtendedResolutionChecks)
+
+    // Verify the clone's anonymous class overrides exactly the expected 
extension points.
+    // If this assertion fails, withCatalogManager was updated but this test 
was not.
+    // Add the corresponding assert above and update the expected set.
+    val overriddenMethods = clone.getClass.getDeclaredMethods
+      .filterNot(m => m.isSynthetic || m.isBridge || m.getName.contains("$"))
+      .map(_.getName)
+      .toSet
+
+    val expectedExtensions = Set(

Review Comment:
   Yes missed that. Fixed.



##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java:
##########
@@ -80,6 +80,12 @@ default boolean useCommitCoordinator() {
    * The execution engine may call {@code commit} multiple times for the same 
epoch in some
    * circumstances. To support exactly-once data semantics, implementations 
must ensure that
    * multiple commits for the same epoch are idempotent.
+   * <p>
+   * Note: this method signals that all data for this write operation has been 
successfully written.
+   * It is NOT a transactional commit. When this write is part of a

Review Comment:
   Updated both this and the one at BatchWrite.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala:
##########
@@ -1272,6 +1280,22 @@ case class Assignment(key: Expression, value: 
Expression) extends Expression
     newLeft: Expression, newRight: Expression): Assignment = copy(key = 
newLeft, value = newRight)
 }
 
+/**
+ * Marker trait for write operations that participate in a DSv2 transaction.
+ *
+ * Implementations are expected to target a DSv2 catalog backed by a
+ * [[org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin]].
+ */
+trait TransactionalWrite extends LogicalPlan {
+  def table: LogicalPlan
+}
+
+/** Trait for streaming write commands that participate in DSv2 transactions. 
*/
+trait StreamingV2WriteCommand extends TransactionalWrite {

Review Comment:
   Is the question about naming? I renamed it to `V2StreamingWriteCommand` to 
be consistent with other commands. I also moved right after `V2WriteCommand`. 
Is this what you had in mind?



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##########
@@ -1024,17 +1053,43 @@ class Analyzer(
     // Unwrap temp views storing analyzed plans and resolve V2TableReference 
nodes in the child.
     private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = {
       EliminateSubqueryAliases(plan) match {
-        case v: View if v.isTempViewStoringAnalyzedPlan => 
resolveTableReferences(v.child)
+        case v: View if v.isTempViewStoringAnalyzedPlan => 
resolveTableReferencesInTempView(v.child)
         case other => other
       }
     }
 
-    // Resolve V2TableReference nodes in a plan. V2TableReference is only 
created for temp views
-    // (via V2TableReference.createForTempView), so we only need to resolve it 
when returning
+    // Resolve the write target of a V2 write command (batch or streaming).
+    private def resolveWriteTarget(

Review Comment:
   The `UnresolvedRelation` relation we create in `MicroBatchExecution` has 
streaming = false. This then matched in resolveWriteTarget `!u.isStreaming` and 
gets resolved to a `DataSourceV2Relation`. 
   
   The alternative here would be to set to streaming in `UnresolvedRelation` to 
True. This then get resolved as a `StreamingRelationV2` instead which we would 
need to add a case and finally resolve to `DataSourceV2Relation` (we would also 
need to drop the  `!u.isStreaming` guard.
   
   IIUC, streaming is a read side concept. For the write perspective, we write 
batches which are not streams any more. This is I think the reason we 
originally create a `DataSourceV2Relation` in  `MicroBatchExecution` instead of 
a `StreamingDataSourceV2Relation`. So the path `UnresolvedRelation` with 
`streaming=true` -> `StreamingRelationV2` -> `StreamingDataSourceV2Relation` is 
not applicable here. I added a comment to explain this.
   
   Thoughts?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to