This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new cd26a8478144 [SPARK-56870][SDP] Implement SCD1 Batch Processor; Extend 
Microbatch with CDC Metadata
cd26a8478144 is described below

commit cd26a847814475fe166df43a673e64aa32e5ba88
Author: AnishMahto <[email protected]>
AuthorDate: Fri May 22 10:45:05 2026 -0700

    [SPARK-56870][SDP] Implement SCD1 Batch Processor; Extend Microbatch with 
CDC Metadata
    
    Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    --------
    
    **Preamble:**
    
    The SCD type 1 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD1 replication semantics.
    
    SCD1 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
    
    **Extend Microbatch with CDC Metadata:**
    
    After deduplication, all of the incoming rows can be classified as either a 
delete event or an upsert event (mutually exclusive), and there's at most one 
per key.
    
    If we identify a row as a delete event, remember its sequencing as its 
`deleteSequence`. If we identify a row as an upsert event, remember its 
sequencing as its `upsertSequence`. That is, `deleteSequence`/`upsertSequence` 
encode both the sequencing for the row as well as the row classification 
(delete or upsert).
    
    We need to persist this encoded information now, because in future stages 
we may drop the columns that `deleteCondition` needed to do the classification 
in the first place, depending on which columns were selected by 
`ChangeArgs.columnSelection`.
    
    **Where is the CDC Metadata stored?**
    
    Within the microbatch, we append a `_cdc_metadata` struct column, that 
stores the `deleteSequence` and `upsertSequence`.
    
    This `_cdc_metadata` column will eventually also land in the persisted 
target and auxiliary tables, which are the artifacts of an AutoCDC flow. This 
column represents operational metadata that the AutoCDC flow has tagged a row 
with, and is necessary for out-of-order correctness of the SCD decomposition. 
    
    Users will not be able to opt out of persisting this column in the target 
table using `ChangeArgs.columnSelection`, as it is necessary for correctness. 
The column will not have a stable public contract, and users should make no 
assumptions on its contents.
    
    Closes #55970 from 
AnishMahto/SPARK-56870-extend-microbatch-with-cdc-metadata.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
    (cherry picked from commit 12807c527ec0cd6513ddedc3895552724e46a870)
    Signed-off-by: DB Tsai <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |   6 +
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 118 +++++++-
 .../autocdc/Scd1BatchProcessorSuite.scala          | 312 ++++++++++++++++++++-
 3 files changed, 419 insertions(+), 17 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index 34e48dec668a..3dcab86b04a5 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -209,6 +209,12 @@
     ],
     "sqlState" : "42703"
   },
+  "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT" : {
+    "message" : [
+      "Using <caseSensitivity> column name comparison, the column 
`<columnName>` in the <schemaName> schema conflicts with the reserved AutoCDC 
column name `<reservedColumnName>`. Rename or remove the column."
+    ],
+    "sqlState" : "42710"
+  },
   "AVRO_CANNOT_WRITE_NULL_FIELD" : {
     "message" : [
       "Cannot write null value for field <name> defined as non-null Avro data 
type <dataType>.",
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index f87a4a1da53d..d50c30919ba8 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -17,16 +17,26 @@
 
 package org.apache.spark.sql.pipelines.autocdc
 
-import org.apache.spark.sql.{functions => F}
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{functions => F, AnalysisException}
+import org.apache.spark.sql.Column
 import org.apache.spark.sql.catalyst.util.QuotingUtils
 import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.util.ArrayImplicits._
 
 /**
  * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the 
specified [[changeArgs]]
  * configuration.
+ *
+ * @param changeArgs The CDC flow configuration.
+ * @param resolvedSequencingType The post-analysis [[DataType]] of the 
sequencing column, derived
+ *                               from the flow's resolved DataFrame at flow 
setup time.
  */
-case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
+case class Scd1BatchProcessor(
+    changeArgs: ChangeArgs,
+    resolvedSequencingType: DataType) {
+
   /**
    * Deduplicate the incoming CDC microbatch by key, keeping the most recent 
event per key
    * as ordered by [[ChangeArgs.sequencing]].
@@ -59,9 +69,111 @@ case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
       )
       .select(F.col(s"$winningRowCol.*"))
   }
+
+  /**
+   * Project the CDC metadata column onto the microbatch.
+   *
+   * This must run before any column selection is applied to the microbatch. 
The
+   * [[ChangeArgs.deleteCondition]] and [[ChangeArgs.sequencing]] expressions 
are evaluated against
+   * the current microbatch schema, and column selection may drop inputs 
required by those
+   * expressions.
+   *
+   * Rows are classified as deletes only when [[ChangeArgs.deleteCondition]] 
evaluates to true. A
+   * false or null delete condition classifies the row as an upsert.
+   *
+   * @param validatedMicrobatch A microbatch that has already been validated 
such that the
+   *                            sequencing column should not contain null 
values, and its data type
+   *                            should support ordering.
+   *
+   * The returned dataframe has all of the columns in the input microbatch + 
the CDC metadata
+   * column.
+   */
+  def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): 
DataFrame = {
+    // Proactively validate the reserved CDC metadata column does not exist in 
the microbatch.
+    validateCdcMetadataColumnNotPresent(validatedMicrobatch)
+
+    val rowDeleteSequence: Column = changeArgs.deleteCondition match {
+      case Some(deleteCondition) =>
+        F.when(deleteCondition, changeArgs.sequencing).otherwise(F.lit(null))
+      case None =>
+        F.lit(null)
+    }
+
+    val rowUpsertSequence: Column =
+      // A row that is not a delete must be an upsert, these are mutually 
exclusive and a complete
+      // set of CDC event types.
+      F.when(rowDeleteSequence.isNull, 
changeArgs.sequencing).otherwise(F.lit(null))
+
+    validatedMicrobatch.withColumn(
+      Scd1BatchProcessor.cdcMetadataColName,
+      Scd1BatchProcessor.constructCdcMetadataCol(
+        deleteSequence = rowDeleteSequence,
+        upsertSequence = rowUpsertSequence,
+        sequencingType = resolvedSequencingType
+      )
+    )
+  }
+
+  private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit 
= {
+    val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
+    val resolver = microbatchSqlConf.resolver
+
+    microbatch.schema.fieldNames
+      .find(resolver(_, Scd1BatchProcessor.cdcMetadataColName))
+      .foreach { conflictingColumnName =>
+        throw new AnalysisException(
+          errorClass = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
+          messageParameters = Map(
+            "caseSensitivity" -> 
CaseSensitivityLabels.of(microbatchSqlConf.caseSensitiveAnalysis),
+            "columnName" -> conflictingColumnName,
+            "schemaName" -> "microbatch",
+            "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
+          )
+        )
+      }
+  }
 }
 
 object Scd1BatchProcessor {
   // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP 
AutoCDC processing.
-  private[autocdc] val winningRowColName = "__spark_autocdc_winning_row"
+  private[autocdc] val winningRowColName: String = 
"__spark_autocdc_winning_row"
+  private[autocdc] val cdcMetadataColName: String = "__spark_autocdc_metadata"
+
+  private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
+  private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
+
+  /**
+   * Schema of the CDC metadata struct column for SCD1.
+   */
+  private def cdcMetadataColSchema(sequencingType: DataType): StructType =
+    StructType(
+      Seq(
+        // The sequencing of the event if it represents a delete, null 
otherwise.
+        StructField(cdcDeleteSequenceFieldName, sequencingType, nullable = 
true),
+        // The sequencing of the event if it represents an upsert, null 
otherwise.
+        StructField(cdcUpsertSequenceFieldName, sequencingType, nullable = 
true)
+      )
+    )
+
+  /**
+   * Construct the CDC metadata struct column for SCD1, following the exact 
schema and field
+   * ordering defined by [[cdcMetadataColSchema]].
+   */
+  private[autocdc] def constructCdcMetadataCol(
+      deleteSequence: Column,
+      upsertSequence: Column,
+      sequencingType: DataType): Column = {
+    val cdcMetadataFieldsInOrder = 
cdcMetadataColSchema(sequencingType).fields.map { field =>
+      val value = field.name match {
+        case `cdcDeleteSequenceFieldName` => deleteSequence
+        case `cdcUpsertSequenceFieldName` => upsertSequence
+        case other =>
+          throw SparkException.internalError(
+            s"Unable to construct SCD1 CDC metadata column due to unknown 
`${other}` field."
+          )
+      }
+      value.cast(field.dataType).as(field.name)
+    }
+    F.struct(cdcMetadataFieldsInOrder.toImmutableArraySeq: _*)
+  }
 }
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index 208c0aa1e4c5..1cb348316436 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.sql.pipelines.autocdc
 
+import java.util.Locale
+
 import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSparkSession
 import org.apache.spark.sql.types._
 
@@ -53,7 +56,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -77,7 +81,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -102,7 +107,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     // On equal sequence number events for the same key we provide no 
guarantee on which event will
@@ -133,7 +139,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -160,7 +167,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
     checkAnswer(
       df = processor.deduplicateMicrobatch(batch),
@@ -187,7 +195,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -217,7 +226,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     // All non-key columns must come from the row with the largest sequence 
value, never
@@ -247,7 +257,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -277,7 +288,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("region"), 
UnqualifiedColumnName("customer_id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -313,7 +325,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.greatest(F.col("seq"), F.col("alt_seq")),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -338,7 +351,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("`user.id`")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkAnswer(
@@ -367,7 +381,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName(reservedColName)),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     checkError(
@@ -402,7 +417,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     // Field names and dataTypes must match the input exactly, in the original 
order.
@@ -424,11 +440,279 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         keys = Seq(UnqualifiedColumnName("id")),
         sequencing = F.col("seq"),
         storedAsScdType = ScdType.Type1
-      )
+      ),
+      resolvedSequencingType = LongType
     )
 
     val result = processor.deduplicateMicrobatch(batch)
     assert(result.collect().isEmpty)
     assert(columnNamesAndDataTypes(result.schema) == 
columnNamesAndDataTypes(schema))
   }
+
+  test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or 
an upsert " +
+    "per deleteCondition") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("is_delete", BooleanType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, false),
+      Row(2, 20L, true),
+      Row(3, 30L, false),
+      Row(4, 40L, true)
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        deleteCondition = Some(F.col("is_delete") === true)
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    // Mutual-exclusivity invariant: each row's CDC metadata struct has 
exactly one of
+    // (deleteSequence, upsertSequence) non-null, and the non-null side 
carries the row's
+    // sequence value.
+    checkAnswer(
+      df = processor.extendMicrobatchRowsWithCdcMetadata(batch),
+      expectedAnswer = Seq(
+        Row(1, 10L, false, Row(null, 10L)),
+        Row(2, 20L, true, Row(20L, null)),
+        Row(3, 30L, false, Row(null, 30L)),
+        Row(4, 40L, true, Row(40L, null))
+      )
+    )
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata treats null deleteCondition 
results as upserts") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("is_delete", BooleanType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, null)
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        deleteCondition = Some(F.col("is_delete"))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.extendMicrobatchRowsWithCdcMetadata(batch),
+      expectedAnswer = Row(1, 10L, null, Row(null, 10L))
+    )
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata treats every row as an upsert " +
+    "when deleteCondition is None") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "a"),
+      Row(2, 20L, "b")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        deleteCondition = None
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.extendMicrobatchRowsWithCdcMetadata(batch),
+      expectedAnswer = Seq(
+        Row(1, 10L, "a", Row(null, 10L)),
+        Row(2, 20L, "b", Row(null, 20L))
+      )
+    )
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata appends CDC metadata as the last 
column") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "a")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.extendMicrobatchRowsWithCdcMetadata(batch)
+
+    // Original columns are preserved in their original order, with CDC 
metadata appended at
+    // the very end.
+    assert(result.schema.fieldNames.toSeq ==
+      schema.fieldNames.toSeq :+ Scd1BatchProcessor.cdcMetadataColName)
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata casts delete / upsert sequence 
fields to " +
+    "resolvedSequencingType") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      // Microbatch's sequencing column is IntegerType, but the flow's 
resolved sequencing type
+      // will be LongType. This should be upcasted in the projected CDC 
metadata column.
+      .add("seq", IntegerType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10, "a")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val resultDf = processor.extendMicrobatchRowsWithCdcMetadata(batch)
+
+    val cdcMetadataDataType =
+      
resultDf.schema(Scd1BatchProcessor.cdcMetadataColName).dataType.asInstanceOf[StructType]
+    assert(columnNamesAndDataTypes(cdcMetadataDataType) == Seq(
+      Scd1BatchProcessor.cdcDeleteSequenceFieldName -> LongType,
+      Scd1BatchProcessor.cdcUpsertSequenceFieldName -> LongType))
+
+    // The cast must also succeed at runtime: upsertSequence is materialized 
as a Long value, not
+    // an Int.
+    checkAnswer(
+      df = resultDf,
+      expectedAnswer = Row(1, 10, "a", Row(null, 10L))
+    )
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata fails fast when the microbatch's 
sequencing column " +
+    "is incompatible with resolvedSequencingType") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      // Microbatch's sequencing column is a struct, whereas the flow's 
resolved sequencing type
+      // will be LongType. These are incompatible and should throw.
+      .add(
+        "seq",
+        new StructType()
+          .add("major", LongType)
+          .add("minor", LongType))
+
+    val batch = microbatchOf(schema)(
+      Row(1, Row(1L, 0L))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val ex = intercept[AnalysisException] {
+      // .schema forces analysis of the underlying logical plan, surfacing the 
invalid cast.
+      processor.extendMicrobatchRowsWithCdcMetadata(batch).schema
+    }
+    assert(ex.getCondition == "DATATYPE_MISMATCH.CAST_WITHOUT_SUGGESTION")
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata rejects a microbatch that already 
contains the " +
+    "reserved CDC metadata column") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      val schema = new StructType()
+        .add("id", IntegerType)
+        .add("seq", LongType)
+        .add(Scd1BatchProcessor.cdcMetadataColName, StringType)
+
+      val batch = microbatchOf(schema)(
+        Row(1, 10L, "user-supplied")
+      )
+
+      val processor = Scd1BatchProcessor(
+        changeArgs = ChangeArgs(
+          keys = Seq(UnqualifiedColumnName("id")),
+          sequencing = F.col("seq"),
+          storedAsScdType = ScdType.Type1
+        ),
+        resolvedSequencingType = LongType
+      )
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          processor.extendMicrobatchRowsWithCdcMetadata(batch)
+        },
+        condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
+        sqlState = "42710",
+        parameters = Map(
+          "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
+          "columnName" -> Scd1BatchProcessor.cdcMetadataColName,
+          "schemaName" -> "microbatch",
+          "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
+        )
+      )
+    }
+  }
+
+  test("extendMicrobatchRowsWithCdcMetadata rejects reserved CDC metadata 
column " +
+    "case-insensitively") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      val conflictingColumnName = 
Scd1BatchProcessor.cdcMetadataColName.toUpperCase(Locale.ROOT)
+      val schema = new StructType()
+        .add("id", IntegerType)
+        .add("seq", LongType)
+        .add(conflictingColumnName, StringType)
+
+      val batch = microbatchOf(schema)(
+        Row(1, 10L, "user-supplied")
+      )
+
+      val processor = Scd1BatchProcessor(
+        changeArgs = ChangeArgs(
+          keys = Seq(UnqualifiedColumnName("id")),
+          sequencing = F.col("seq"),
+          storedAsScdType = ScdType.Type1
+        ),
+        resolvedSequencingType = LongType
+      )
+
+      checkError(
+        exception = intercept[AnalysisException] {
+          processor.extendMicrobatchRowsWithCdcMetadata(batch)
+        },
+        condition = "AUTOCDC_RESERVED_COLUMN_NAME_CONFLICT",
+        sqlState = "42710",
+        parameters = Map(
+          "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+          "columnName" -> conflictingColumnName,
+          "schemaName" -> "microbatch",
+          "reservedColumnName" -> Scd1BatchProcessor.cdcMetadataColName
+        )
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to