szehon-ho commented on code in PR #56042:
URL: https://github.com/apache/spark/pull/56042#discussion_r3291326172
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/CoreDataflowNodeProcessor.scala:
##########
@@ -199,9 +199,18 @@ private class FlowResolver(rawGraph: DataflowGraph) {
}
}
- private def convertResolvedToTypedFlow(
+ private def transformUnresolvedFlowToResolvedFlow(
flow: UnresolvedFlow,
funcResult: FlowFunctionResult): ResolvedFlow = {
+ flow match {
+ case acf: AutoCdcFlow => new AutoCdcMergeFlow(acf, funcResult)
Review Comment:
`UntypedFlow` resolution uses `funcResult.dataFrame.get.isStreaming` to
choose `StreamingFlow` vs `CompleteFlow`, but `AutoCdcFlow` always becomes
`AutoCdcMergeFlow` regardless of whether the source is streaming.
That means an `AutoCdcFlow` with a **batch** source can still resolve
successfully when the destination is a non-streaming table (e.g. materialized
view), because `validateFlowStreamingness` only rejects streaming sources for
MVs—not batch sources for AutoCDC specifically.
Since this PR documents that AutoCDC is streaming-only (`once = false`,
class-level comments), consider enforcing `df.isStreaming` here (or in
`validateFlowStreamingness` with an AutoCDC-specific check), e.g.:
```scala
case acf: AutoCdcFlow =>
if (!funcResult.dataFrame.get.isStreaming) {
throw new AnalysisException(
errorClass = "INVALID_FLOW_QUERY_TYPE.BATCH_RELATION_FOR_AUTOCDC_FLOW",
messageParameters = Map("flowIdentifier" ->
acf.identifier.quotedString)
)
}
new AutoCdcMergeFlow(acf, funcResult)
```
Happy to defer to the execution PR if you prefer, but worth tracking so we
don't rely on destination-type checks alone.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala:
##########
@@ -194,3 +243,108 @@ class AppendOnceFlow(
override val once = true
}
+
+/**
+ * A resolved flow that applies a CDC event stream to a target table via
MERGE, in accordance to
+ * the configured [[flow.changeArgs]].
+ */
+class AutoCdcMergeFlow(
+ val flow: AutoCdcFlow,
+ val funcResult: FlowFunctionResult
+) extends ResolvedFlow {
+ requireReservedPrefixAbsentInSourceColumns()
+
+ def changeArgs: ChangeArgs = flow.changeArgs
+
+ /**
+ * Returns the augmented output schema of this flow, which can differ from
the schema of the
+ * source change-data-feed dataframe.
+ *
+ * The source dataframe's schema describes the incoming CDC events; the
augmented schema here
+ * applies the user-specified [[ColumnSelection]] and appends the
SCD-specific metadata
+ * columns that the AutoCDC MERGE engine projects onto the target table.
Downstream
+ * dependencies in the pipeline see this augmented schema.
+ */
+ override val schema: StructType = {
+ val userSelectedSchema = ColumnSelection.applyToSchema(
+ schemaName = "changeDataFeed",
+ schema = df.schema,
+ columnSelection = changeArgs.columnSelection,
+ caseSensitive = spark.sessionState.conf.caseSensitiveAnalysis
+ )
+
+ // AutoCDC flows require all key columns to be present in the target
table, to adhere to SCD
+ // semantics.
+ requireKeysPresentInSelectedSchema(userSelectedSchema)
+
+ changeArgs.storedAsScdType match {
+ case ScdType.Type1 =>
+ // SCD1 produces a target table with all the user-selected output
columns and a projected
+ // CDC operational metadata column at the end.
+ StructType(
+ userSelectedSchema.fields :+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(
+ sequencingType =
df.select(changeArgs.sequencing).schema.head.dataType
+ ),
+ nullable = false
+ )
+ )
+ case ScdType.Type2 =>
+ throw new UnsupportedOperationException(
Review Comment:
Other AutoCDC validation in this PR uses `AnalysisException` with structured
error classes (`AUTOCDC_*`). SCD2 throws a bare `UnsupportedOperationException`
here.
For consistency with SDP/Connect error handling (and so clients get a stable
`errorClass`), consider something like:
```scala
case ScdType.Type2 =>
throw new AnalysisException(
errorClass = "AUTOCDC_SCD2_NOT_SUPPORTED",
messageParameters = Map.empty
)
```
(with a corresponding entry in `error-conditions.json`). The eager failure
at `AutoCdcMergeFlow` construction is good; only the exception type differs.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/Flow.scala:
##########
@@ -121,15 +129,56 @@ case class FlowFunctionResult(
}
/** A [[Flow]] whose output schema and dependencies aren't known. */
-case class UnresolvedFlow(
+sealed trait UnresolvedFlow extends Flow {
+ /** Returns a copy of this flow with the given SQL confs overriding the
existing ones. */
+ def withSqlConf(newSqlConf: Map[String, String]): UnresolvedFlow
+}
+
+/**
+ * An [[UnresolvedFlow]] whose execution-type has not yet been determined.
+ *
+ * In some cases, we know the execution-type for an [[UnresolvedFlow]] even
before flow analysis
+ * and resolution. For example an AutoCDCFlow is a special
unresolved-but-typed flow; we know a
+ * flow will be an AutoCDC flow immediately on construction, because it has
its own special
+ * registration API. Such flows are considered "typed flows", but there isn't
any semantic reason
+ * yet to explicitly introduce a `TypedFlow` trait/class.
+ */
+case class UntypedFlow(
identifier: TableIdentifier,
destinationIdentifier: TableIdentifier,
func: FlowFunction,
queryContext: QueryContext,
sqlConf: Map[String, String],
override val once: Boolean,
override val origin: QueryOrigin
-) extends Flow
+) extends UnresolvedFlow {
+ override def withSqlConf(newSqlConf: Map[String, String]): UntypedFlow =
+ copy(sqlConf = newSqlConf)
+}
+
+/**
+ * An unresolved but typed that applies a CDC event stream to a target table
via MERGE.
Review Comment:
Nit: missing word in the scaladoc — should be something like:
> An unresolved but typed **flow** that applies a CDC event stream to a
target table via MERGE.
##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/graph/GraphValidations.scala:
##########
@@ -58,6 +76,13 @@ trait GraphValidations extends Logging {
multiQueryTables
}
+ /** Returns true iff the given flow is an [[AutoCdcFlow]] (resolved or not).
*/
+ private def isAutoCdcFlow(f: Flow): Boolean = f match {
Review Comment:
The current logic works because `AutoCdcMergeFlow` extends
`ResolutionCompletedFlow` and `rcf.flow` is an `AutoCdcFlow`. For readability,
consider making the resolved case explicit:
```scala
private def isAutoCdcFlow(f: Flow): Boolean = f match {
case _: AutoCdcFlow | _: AutoCdcMergeFlow => true
case rcf: ResolutionCompletedFlow => rcf.flow.isInstanceOf[AutoCdcFlow]
case _ => false
}
```
Or, if you only ever see unresolved `AutoCdcFlow` and resolved
`AutoCdcMergeFlow` in `flowsTo`, the middle case might be redundant—worth a
quick comment on which shapes appear at validation time.
##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcFlowSuite.scala:
##########
@@ -0,0 +1,550 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import java.util.Locale
+
+import scala.util.Success
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column,
QueryTest}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.pipelines.graph.{
+ AutoCdcFlow,
+ AutoCdcMergeFlow,
+ FlowFunction,
+ FlowFunctionResult,
+ Input,
+ QueryContext,
+ QueryOrigin
+}
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, IntegerType, LongType,
StringType, StructField, StructType}
+
+/**
+ * Unit tests for the [[AutoCdcFlow]] data class and the augmented schema
computed by
+ * [[AutoCdcMergeFlow]]. The tests stop at the data-class / schema surface;
they do not
+ * exercise the full pipeline-graph resolution machinery (which is not yet
wired up to AutoCDC
+ * flows).
+ */
+class AutoCdcFlowSuite extends QueryTest with SharedSparkSession {
+
+ private val testIdentifier = TableIdentifier("cdc_target", Some("db"))
+
+ /** A no-op [[FlowFunction]] that throws if invoked; AutoCdcFlow tests
should never call it. */
+ private val noOpFlowFunction: FlowFunction = new FlowFunction {
+ override def call(
+ allInputs: Set[TableIdentifier],
+ availableInputs: Seq[Input],
+ configuration: Map[String, String],
+ queryContext: QueryContext,
+ queryOrigin: QueryOrigin): FlowFunctionResult =
+ throw new UnsupportedOperationException(
+ "noOpFlowFunction.call should not be invoked from AutoCdcFlowSuite
tests"
+ )
+ }
+
+ private val testQueryContext =
+ QueryContext(currentCatalog = Some("test_catalog"), currentDatabase =
Some("test_db"))
+
+ private val testChangeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+
+ private def newAutoCdcFlow(
+ identifier: TableIdentifier = testIdentifier,
+ destinationIdentifier: TableIdentifier = testIdentifier,
+ func: FlowFunction = noOpFlowFunction,
+ queryContext: QueryContext = testQueryContext,
+ sqlConf: Map[String, String] = Map.empty,
+ comment: Option[String] = None,
+ origin: QueryOrigin = QueryOrigin.empty,
+ changeArgs: ChangeArgs = testChangeArgs): AutoCdcFlow = {
+ AutoCdcFlow(
+ identifier = identifier,
+ destinationIdentifier = destinationIdentifier,
+ func = func,
+ queryContext = queryContext,
+ sqlConf = sqlConf,
+ comment = comment,
+ origin = origin,
+ changeArgs = changeArgs
+ )
+ }
+
+ test("AutoCdcFlow exposes its constructor fields") {
+ val flow = newAutoCdcFlow(
+ sqlConf = Map("spark.sql.shuffle.partitions" -> "8"),
+ comment = Some("my CDC flow")
+ )
+
+ assert(flow.identifier == testIdentifier)
+ assert(flow.destinationIdentifier == testIdentifier)
+ assert(flow.func eq noOpFlowFunction)
+ assert(flow.queryContext == testQueryContext)
+ assert(flow.sqlConf == Map("spark.sql.shuffle.partitions" -> "8"))
+ assert(flow.comment.contains("my CDC flow"))
+ assert(flow.origin == QueryOrigin.empty)
+ assert(flow.changeArgs == testChangeArgs)
+ }
+
+ test("AutoCdcFlow defaults sqlConf to empty and comment to None") {
+ // Confirms the case-class default values match the documented contract;
downstream
+ // registration code relies on `sqlConf` being a non-null empty map by
default so that
+ // `defaultSqlConf ++ flowDef.sqlConf` is well-defined in
[[GraphRegistrationContext]].
+ val flow = AutoCdcFlow(
+ identifier = testIdentifier,
+ destinationIdentifier = testIdentifier,
+ func = noOpFlowFunction,
+ queryContext = testQueryContext,
+ origin = QueryOrigin.empty,
+ changeArgs = testChangeArgs
+ )
+
+ assert(flow.sqlConf.isEmpty)
+ assert(flow.comment.isEmpty)
+ }
+
+ test("AutoCdcFlow.once is always false") {
+ // AutoCDC flows are streaming-only and must run on every batch trigger,
never as a
+ // one-shot full-refresh-style flow. Locking this in so a future refactor
doesn't
+ // accidentally make `once` configurable.
+
+ // In the future we may intentionally add [[once]] support for AutoCDC
flows, at which point
+ // this test can safely be removed.
+ val flow = newAutoCdcFlow()
+ assert(!flow.once)
+ }
+
+ test("AutoCdcFlow.withSqlConf returns a new instance with the updated
sqlConf") {
+ val original = newAutoCdcFlow(sqlConf = Map("a" -> "1"))
+ val updated = original.withSqlConf(Map("b" -> "2"))
+
+ assert(updated.sqlConf == Map("b" -> "2"))
+ // All other fields should be preserved verbatim.
+ assert(updated.identifier == original.identifier)
+ assert(updated.destinationIdentifier == original.destinationIdentifier)
+ assert(updated.func eq original.func)
+ assert(updated.queryContext == original.queryContext)
+ assert(updated.comment == original.comment)
+ assert(updated.origin == original.origin)
+ assert(updated.changeArgs == original.changeArgs)
+ // The original must not be mutated.
+ assert(original.sqlConf == Map("a" -> "1"))
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow.schema tests
+ //
===========================================================================================
+
+ /** Materializes a successful [[FlowFunctionResult]] backed by the given
source dataframe. */
+ private def successfulFuncResult(sourceDf: DataFrame): FlowFunctionResult =
+ FlowFunctionResult(
+ requestedInputs = Set.empty,
+ batchInputs = Set.empty,
+ streamingInputs = Set.empty,
+ usedExternalInputs = Set.empty,
+ dataFrame = Success(sourceDf),
+ sqlConf = Map.empty
+ )
+
+ /** Builds a [[AutoCdcMergeFlow]] over the given source dataframe + change
args. */
+ private def newAutoCdcMergeFlow(
+ sourceDf: DataFrame,
+ keys: Seq[UnqualifiedColumnName] = Seq(UnqualifiedColumnName("id")),
+ sequencing: Column = F.col("seq"),
+ storedAsScdType: ScdType = ScdType.Type1,
+ columnSelection: Option[ColumnSelection] = None): AutoCdcMergeFlow = {
+ val flow = newAutoCdcFlow(
+ changeArgs = ChangeArgs(
+ keys = keys,
+ sequencing = sequencing,
+ storedAsScdType = storedAsScdType,
+ columnSelection = columnSelection
+ )
+ )
+ new AutoCdcMergeFlow(flow, successfulFuncResult(sourceDf))
+ }
+
+ /** A stable 3-column source CDF schema used across most schema tests. */
+ private def threeColumnSourceDf(): DataFrame = {
+ val schema = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+ }
+
+ /** Convenience to extract the [[StructType]] of the projected
`_cdc_metadata` column. */
+ private def cdcMetadataStruct(schema: StructType): StructType =
+
schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+
+ test(
+ "AutoCdcMergeFlow.schema appends _cdc_metadata to the source schema when
no " +
+ "columnSelection is set"
+ ) {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("name", StringType)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test("AutoCdcMergeFlow.schema applies an IncludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("seq"))
+ )
+ )
+ )
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test("AutoCdcMergeFlow.schema applies an ExcludeColumns selection") {
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+ )
+ )
+
+ val expected = new StructType()
+ .add("id", IntegerType, nullable = false)
+ .add("seq", LongType)
+ .add(
+ StructField(
+ Scd1BatchProcessor.cdcMetadataColName,
+ Scd1BatchProcessor.cdcMetadataColSchema(LongType),
+ nullable = false
+ )
+ )
+ assert(resolvedFlow.schema == expected)
+ }
+
+ test(
+ "AutoCdcMergeFlow.schema's _cdc_metadata struct uses the resolved
sequencing data type"
+ ) {
+ // Source has a Long `seq` column; sequencing is `cast(seq as int)`, so
the projected
+ // `_cdc_metadata` fields should be Int (not Long), demonstrating that the
sequencing
+ // expression's *resolved* type drives the metadata schema.
+ val resolvedFlow = newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ sequencing = F.col("seq").cast(IntegerType)
+ )
+
+ val metaStruct = cdcMetadataStruct(resolvedFlow.schema)
+ assert(metaStruct == Scd1BatchProcessor.cdcMetadataColSchema(IntegerType))
+ }
+
+ test("AutoCdcMergeFlow.schema's _cdc_metadata field is non-null with
nullable inner fields") {
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+
+ val metaField = resolvedFlow.schema(Scd1BatchProcessor.cdcMetadataColName)
+ assert(!metaField.nullable, "_cdc_metadata column itself must be non-null")
+
+ val metaStruct = metaField.dataType.asInstanceOf[StructType]
+ assert(metaStruct(Scd1BatchProcessor.cdcDeleteSequenceFieldName).nullable)
+ assert(metaStruct(Scd1BatchProcessor.cdcUpsertSequenceFieldName).nullable)
+ }
+
+ test("AutoCdcMergeFlow.schema is stable across reads") {
+ // The schema computation calls `df.select(sequencing).schema`, which
triggers Spark
+ // analysis. The eagerly-initialized `val` caches the result so downstream
consumers get
+ // a stable schema instance across reads.
+ val resolvedFlow = newAutoCdcMergeFlow(threeColumnSourceDf())
+ val first = resolvedFlow.schema
+ val second = resolvedFlow.schema
+ assert(first eq second, "schema should be cached as a val and return the
same instance")
+ }
+
+ test("AutoCdcMergeFlow rejects SCD2 at construction with
UnsupportedOperationException") {
+ // Constructing the flow forces the resolved schema, which is unsupported
for SCD2 today.
+ // Failing eagerly (rather than deferring to the first downstream `schema`
read) is the
+ // intended UX -- pipeline graph analysis should not be able to register
an SCD2 AutoCDC
+ // flow at all.
+ val ex = intercept[UnsupportedOperationException] {
+ newAutoCdcMergeFlow(
+ sourceDf = threeColumnSourceDf(),
+ storedAsScdType = ScdType.Type2
+ )
+ }
+ assert(
+ ex.getMessage.contains("AutoCDC flows do not currently support SCD Type
2 transformations.")
+ )
+ }
+
+ //
===========================================================================================
+ // AutoCdcMergeFlow reserved-prefix validation tests
+ //
+ // The two "contract:" tests below lock in the high-level invariant that no
reserved-prefix
+ // column name can be referenced anywhere -- not in the source change-data
feed schema, and
+ // not in user-supplied [[ChangeArgs]] (keys or columnSelection). Together
they ensure that
+ // (a) users cannot opt out of the reserved CDC metadata column by omitting
it from the
+ // selected schema, and (b) users cannot opt in to (or out of) any other
reserved-prefix
+ // name we may reserve in the future for an internal CDC concern.
+ //
+ // The remaining tests pin down case-sensitivity nuances of the
source-schema validator.
+ //
===========================================================================================
+
+ /** Builds an empty source df with `id` + `seq` + the supplied extra
columns. */
+ private def sourceDfWithExtraColumns(extraColumns: (String, DataType)*):
DataFrame = {
+ val schema = extraColumns.foldLeft(
+ new StructType().add("id", IntegerType, nullable = false).add("seq",
LongType)
+ ) { case (acc, (name, dt)) => acc.add(name, dt) }
+
spark.createDataFrame(spark.sparkContext.emptyRDD[org.apache.spark.sql.Row],
schema)
+ }
+
+ test(
+ "Contract: a source df column with the reserved AutoCDC prefix is rejected
at flow " +
+ "construction"
+ ) {
+ val conflictingName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+ val sourceDf = sourceDfWithExtraColumns(conflictingName -> StringType)
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(sourceDf)
+ },
+ condition = "AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT",
+ sqlState = "42710",
+ parameters = Map(
+ "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+ "columnName" -> conflictingName,
+ "schemaName" -> "changeDataFeed",
+ "reservedColumnNamePrefix" ->
Scd1BatchProcessor.reservedColumnNamePrefix
+ )
+ )
+ }
+
+ test(
+ "Contract: ChangeArgs referencing a reserved-prefix column is rejected
even when the " +
+ "source df is clean"
+ ) {
+ // The source df has no reserved-prefix columns, but referencing a
reserved-prefix column
+ // from any ChangeArgs path still fails at construction with a different
error. The
+ // reservation is on the name itself, not on its presence in the source
feed.
+ val cleanSourceDf = threeColumnSourceDf()
+ val reservedName = s"${Scd1BatchProcessor.reservedColumnNamePrefix}foo"
+
+ val keysEx = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = cleanSourceDf,
+ keys = Seq(UnqualifiedColumnName(reservedName))
+ )
+ }
+ assert(keysEx.getCondition == "AUTOCDC_KEY_NOT_IN_SELECTED_SCHEMA")
+
+ val includeEx = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = cleanSourceDf,
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"),
UnqualifiedColumnName(reservedName))
+ )
+ )
+ )
+ }
+ assert(includeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA")
+
+ val excludeEx = intercept[AnalysisException] {
+ newAutoCdcMergeFlow(
+ sourceDf = cleanSourceDf,
+ columnSelection = Some(
+
ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName(reservedName)))
+ )
+ )
+ }
+ assert(excludeEx.getCondition == "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA")
+ }
+
+ test("AutoCdcMergeFlow rejects a source df column starting with the reserved
prefix") {
Review Comment:
These two tests look identical (same setup, same `checkError` for
`AUTOCDC_RESERVED_COLUMN_NAME_PREFIX_CONFLICT`):
- ~338: "Contract: a source df column with the reserved AutoCDC prefix..."
- ~400: this test
Suggest keeping the "Contract:" test and removing this duplicate, unless you
intended to split case-sensitive vs case-insensitive behavior (those are
already covered by the tests below).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]