andreaschat-db commented on code in PR #55278: URL: https://github.com/apache/spark/pull/55278#discussion_r3166527139
########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalyzerExtensionPropagationSuite.scala: ########## @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.analysis.resolver.{ResolverExtension, TreeNodeResolver} +import org.apache.spark.sql.catalyst.catalog.{InMemoryCatalog, SessionCatalog} +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.connector.catalog.CatalogManager + +/** + * Verifies that [[Analyzer.withCatalogManager]] propagates all extension points. + * + * If this suite fails with an unexpected method count, a new extension point was added to + * [[Analyzer.withCatalogManager]] without being verified here. Add the corresponding assertion + * and update the expected count. + * + * If [[Analyzer]] gains a new extension point that is NOT yet in [[Analyzer.withCatalogManager]], + * add it there first, then update this suite. + */ +class AnalyzerExtensionPropagationSuite extends SparkFunSuite { + + private val dummyRule: Rule[LogicalPlan] = new Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = plan + } + + private val dummyCheck: LogicalPlan => Unit = (_: LogicalPlan) => () + + private val dummyExtension: ResolverExtension = new ResolverExtension { + override def resolveOperator( + operator: LogicalPlan, + resolver: TreeNodeResolver[LogicalPlan, LogicalPlan]): Option[LogicalPlan] = None + } + + private def newCatalogManager(): CatalogManager = + new CatalogManager( + FakeV2SessionCatalog, + new SessionCatalog(new InMemoryCatalog, EmptyFunctionRegistry)) + + test("withCatalogManager propagates all extension points") { + val analyzer = new Analyzer(newCatalogManager()) { + override val hintResolutionRules: Seq[Rule[LogicalPlan]] = Seq(dummyRule) + override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = Seq(dummyRule) + override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = Seq(dummyRule) + override val extendedCheckRules: Seq[LogicalPlan => Unit] = Seq(dummyCheck) + override val singlePassResolverExtensions: Seq[ResolverExtension] = Seq(dummyExtension) + override val singlePassMetadataResolverExtensions: Seq[ResolverExtension] = + Seq(dummyExtension) + override val singlePassPostHocResolutionRules: Seq[Rule[LogicalPlan]] = Seq(dummyRule) + override val singlePassExtendedResolutionChecks: Seq[LogicalPlan => Unit] = Seq(dummyCheck) + } + + val clone = analyzer.withCatalogManager(newCatalogManager()) + + assert(clone.hintResolutionRules eq analyzer.hintResolutionRules) + assert(clone.extendedResolutionRules eq analyzer.extendedResolutionRules) + assert(clone.postHocResolutionRules eq analyzer.postHocResolutionRules) + assert(clone.extendedCheckRules eq analyzer.extendedCheckRules) + assert(clone.singlePassResolverExtensions eq analyzer.singlePassResolverExtensions) + assert(clone.singlePassMetadataResolverExtensions eq + analyzer.singlePassMetadataResolverExtensions) + assert(clone.singlePassPostHocResolutionRules eq analyzer.singlePassPostHocResolutionRules) + assert(clone.singlePassExtendedResolutionChecks eq analyzer.singlePassExtendedResolutionChecks) + + // Verify the clone's anonymous class overrides exactly the expected extension points. + // If this assertion fails, withCatalogManager was updated but this test was not. + // Add the corresponding assert above and update the expected set. + val overriddenMethods = clone.getClass.getDeclaredMethods + .filterNot(m => m.isSynthetic || m.isBridge || m.getName.contains("$")) + .map(_.getName) + .toSet + + val expectedExtensions = Set( Review Comment: Yes missed that. Fixed. ########## sql/catalyst/src/main/java/org/apache/spark/sql/connector/write/streaming/StreamingWrite.java: ########## @@ -80,6 +80,12 @@ default boolean useCommitCoordinator() { * The execution engine may call {@code commit} multiple times for the same epoch in some * circumstances. To support exactly-once data semantics, implementations must ensure that * multiple commits for the same epoch are idempotent. + * <p> + * Note: this method signals that all data for this write operation has been successfully written. + * It is NOT a transactional commit. When this write is part of a Review Comment: Updated both this and the one at BatchWrite. ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2Commands.scala: ########## @@ -1272,6 +1280,22 @@ case class Assignment(key: Expression, value: Expression) extends Expression newLeft: Expression, newRight: Expression): Assignment = copy(key = newLeft, value = newRight) } +/** + * Marker trait for write operations that participate in a DSv2 transaction. + * + * Implementations are expected to target a DSv2 catalog backed by a + * [[org.apache.spark.sql.connector.catalog.TransactionalCatalogPlugin]]. + */ +trait TransactionalWrite extends LogicalPlan { + def table: LogicalPlan +} + +/** Trait for streaming write commands that participate in DSv2 transactions. */ +trait StreamingV2WriteCommand extends TransactionalWrite { Review Comment: Is the question about naming? I renamed it to `V2StreamingWriteCommand` to be consistent with other commands. I also moved right after `V2WriteCommand`. Is this what you had in mind? ########## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ########## @@ -1024,17 +1053,43 @@ class Analyzer( // Unwrap temp views storing analyzed plans and resolve V2TableReference nodes in the child. private def unwrapRelationPlan(plan: LogicalPlan): LogicalPlan = { EliminateSubqueryAliases(plan) match { - case v: View if v.isTempViewStoringAnalyzedPlan => resolveTableReferences(v.child) + case v: View if v.isTempViewStoringAnalyzedPlan => resolveTableReferencesInTempView(v.child) case other => other } } - // Resolve V2TableReference nodes in a plan. V2TableReference is only created for temp views - // (via V2TableReference.createForTempView), so we only need to resolve it when returning + // Resolve the write target of a V2 write command (batch or streaming). + private def resolveWriteTarget( Review Comment: The `UnresolvedRelation` relation we create in `MicroBatchExecution` has streaming = false. This then matched in resolveWriteTarget `!u.isStreaming` and gets resolved to a `DataSourceV2Relation`. The alternative here would be to set to streaming in `UnresolvedRelation` to True. This then get resolved as a `StreamingRelationV2` instead which we would need to add a case and finally resolve to `DataSourceV2Relation` (we would also need to drop the `!u.isStreaming` guard. IIUC, streaming is a read side concept. For the write perspective, we write batches which are not streams any more. This is I think the reason we originally create a `DataSourceV2Relation` in `MicroBatchExecution` instead of a `StreamingDataSourceV2Relation`. So the path `UnresolvedRelation` with `streaming=true` -> `StreamingRelationV2` -> `StreamingDataSourceV2Relation` is not applicable here. I added a comment to explain this. Thoughts? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
