This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new cb28819952da [SPARK-56953][SDP] Implement SCD1 Batch Processor; 
foreachBatch Callback
cb28819952da is described below

commit cb28819952da6da9b69bcb3e0ab15da33fc0213f
Author: AnishMahto <[email protected]>
AuthorDate: Sun May 24 18:49:55 2026 -0700

    [SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback
    
    Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    --------
    
    This is a stacked PR. Review incremental diff here: 
https://github.com/AnishMahto/spark/compare/SPARK-56927-SCD1-merge-microbatch-onto-target...SPARK-56953-SCD1-foreachbatch-callback
    
    Incremental diff that includes [SPARK-56953] [SPARK-56927][SPARK-56923]: 
https://github.com/AnishMahto/spark/compare/SPARK-56249-merge-tombstones-onto-microbatch...SPARK-56953-SCD1-foreachbatch-callback
    
    --------
    
    **Preamble:**
    
    The SCD type 1 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD1 replication semantics.
    
    SCD1 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
    
    **foreachBatch Callback:**
    
    Implementation of the actual callback that will be passed into the 
foreachBatch streaming query for SCD1 flows.
    
    The callback orchestrates microbatch validation and calling all of the 
`Scd1BatchProcessor` helpers to execute the complete reconciliation of a 
change-data-feed microbatch onto the auxiliary and target tables.
    
    Introduce Scd1ForeachBatchExecSuite to exercise E2E business logic unit 
tests. In the future when we hook this up to an actual flow execution, we will 
introduce E2E _integration_ tests that can additionally verify schema 
evolution, interaction with other dataflow graph entities, etc.
    
    Additionally refactor some test helper functions into a shared 
`AutoCdcCatalogExecutionTestBase`.
    
    Closes #56016 from AnishMahto/SPARK-56953-SCD1-foreachbatch-callback.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
    (cherry picked from commit 30bdf0b39fa6b28ea149ddd43ecd962b2b3c0cd2)
    Signed-off-by: DB Tsai <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  23 +
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 191 +++++-
 .../autocdc/Scd1ForeachBatchHandler.scala          |  73 +++
 .../sql/pipelines/autocdc/ScdBatchValidator.scala  | 100 ++++
 .../autocdc/AutoCdcCatalogExecutionTestBase.scala  | 123 ++++
 .../autocdc/Scd1BatchProcessorMergeSuite.scala     | 541 +++++++++++++++++
 .../autocdc/Scd1ForeachBatchHandlerSuite.scala     | 639 +++++++++++++++++++++
 7 files changed, 1686 insertions(+), 4 deletions(-)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index f50c5554b5dc..7632f70698f2 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -203,6 +203,29 @@
     ],
     "sqlState" : "22023"
   },
+  "AUTOCDC_MICROBATCH_VALIDATION" : {
+    "message" : [
+      "AutoCDC flow on table <tableName> in batch <batchId> failed microbatch 
validation."
+    ],
+    "subClass" : {
+      "NON_ORDERABLE_SEQUENCE" : {
+        "message" : [
+          "The sequencing column has non-orderable type <dataType>. The 
sequencing column must be of a type that supports ordering."
+        ]
+      },
+      "NULL_KEY" : {
+        "message" : [
+          "The microbatch contains rows with null values in the following key 
column(s): <nullKeyCounts>. All rows must have non-null values for every key 
column."
+        ]
+      },
+      "NULL_SEQUENCE" : {
+        "message" : [
+          "The microbatch contains <nullCount> row(s) with a null sequencing 
value. All rows must have a non-null sequencing value."
+        ]
+      }
+    },
+    "sqlState" : "22000"
+  },
   "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
     "message" : [
       "Expected a single column identifier; got the multi-part identifier 
<columnName> (parts: <nameParts>)."
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index e80d43b11554..32aebc8924c0 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.autocdc
 import org.apache.spark.SparkException
 import org.apache.spark.sql.{functions => F, AnalysisException}
 import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.util.QuotingUtils
 import org.apache.spark.sql.classic.DataFrame
 import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -37,6 +38,45 @@ case class Scd1BatchProcessor(
     changeArgs: ChangeArgs,
     resolvedSequencingType: DataType) {
 
+  /**
+   * Reconcile a CDC microbatch into the canonical form that the auxiliary- 
and target-table
+   * merges consume. Composes the per-step transforms in the only order that 
produces correct
+   * SCD1 semantics:
+   *
+   *   1. [[deduplicateMicrobatch]]: collapse same-key events to the latest by 
sequence.
+   *   2. [[extendMicrobatchRowsWithCdcMetadata]]: project the operational 
`_cdc_metadata` column
+   *      (must run before column selection, which may drop inputs the 
metadata expressions
+   *      reference).
+   *   3. [[projectTargetColumnsOntoMicrobatch]]: apply the user-defined 
column selection while
+   *      preserving the CDC metadata column.
+   *   4. [[applyTombstonesToMicrobatch]]: filter out late-arriving events 
superseded by
+   *      tombstones already recorded in the auxiliary table.
+   *
+   * The per-step methods are kept package-visible so that focused unit tests 
can pin each
+   * transform's behavior independently. This method itself is package-visible 
so that
+   * [[Scd1ForeachBatchHandler]] can call it after running 
[[ScdBatchValidator.validateMicrobatch]]
+   * - validation is intentionally not folded in here, as it must run before 
any of these
+   * transforms touch the data.
+   *
+   * @param batchDf          The validated incoming CDC microbatch.
+   * @param auxiliaryTableDf A snapshot of the auxiliary table for tombstone 
reconciliation.
+   *                         Must contain at minimum the key columns + 
`_cdc_metadata`.
+   * @return The reconciled microbatch, ready to be merged onto both tables.
+   */
+  private[autocdc] def reconcileMicrobatch(
+      batchDf: DataFrame,
+      auxiliaryTableDf: DataFrame): DataFrame = {
+    val deduplicated = deduplicateMicrobatch(validatedMicrobatch = batchDf)
+    val withCdcMetadata = 
extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch = deduplicated)
+    val projected = projectTargetColumnsOntoMicrobatch(
+      microbatchWithCdcMetadataDf = withCdcMetadata
+    )
+    applyTombstonesToMicrobatch(
+      microbatchDf = projected,
+      auxiliaryTableDf = auxiliaryTableDf
+    )
+  }
+
   /**
    * Deduplicate the incoming CDC microbatch by key, keeping the most recent 
event per key
    * as ordered by [[ChangeArgs.sequencing]].
@@ -51,7 +91,7 @@ case class Scd1BatchProcessor(
    *
    * The schema of the returned dataframe matches the schema of the microbatch 
exactly.
    */
-  def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
+  private[autocdc] def deduplicateMicrobatch(validatedMicrobatch: DataFrame): 
DataFrame = {
     // The `max_by` API can only return a single column, so pack/unpack the 
entire row into a
     // temporary column before and after the `max_by` operation.
     val winningRowCol = Scd1BatchProcessor.winningRowColName
@@ -88,7 +128,8 @@ case class Scd1BatchProcessor(
    * The returned dataframe has all of the columns in the input microbatch + 
the CDC metadata
    * column.
    */
-  def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame): 
DataFrame = {
+  private[autocdc] def extendMicrobatchRowsWithCdcMetadata(
+      validatedMicrobatch: DataFrame): DataFrame = {
     // Proactively validate the reserved CDC metadata column does not exist in 
the microbatch.
     validateCdcMetadataColumnNotPresent(validatedMicrobatch)
 
@@ -123,7 +164,8 @@ case class Scd1BatchProcessor(
    * Returned dataframe's schema is: all of the user-selected columns in the 
input dataframe as per
    * [[ChangeArgs.columnSelection]] + the CDC metadata column.
    */
-  def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: 
DataFrame): DataFrame = {
+  private[autocdc] def projectTargetColumnsOntoMicrobatch(
+      microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
     val caseSensitiveColumnComparison =
       
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
 
@@ -178,7 +220,7 @@ case class Scd1BatchProcessor(
    * The returned filtered dataframe has the same schema as the input 
microbatch, but with only
    * the rows that remain unaffected by any known tombstones.
    */
-  def applyTombstonesToMicrobatch(
+  private[autocdc] def applyTombstonesToMicrobatch(
       microbatchDf: DataFrame,
       auxiliaryTableDf: DataFrame): DataFrame = {
     val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
@@ -212,6 +254,147 @@ case class Scd1BatchProcessor(
     )
   }
 
+  /**
+   * Merge the reconciled (deduplicated per key) microbatch onto the auxiliary 
table,
+   * advancing or deleting existing tombstones and inserting new tombstones 
for previously
+   * untracked keys.
+   *
+   * After the merge, the auxiliary table has the same schema as before, but 
with the latest
+   * tombstone data per key.
+   *
+   * @param reconciledMicrobatchDf   The deduplicated microbatch.
+   * @param auxiliaryTableIdentifier The identifier of the auxiliary table.
+   */
+  private[autocdc] def mergeMicrobatchOntoAuxiliaryTable(
+      reconciledMicrobatchDf: DataFrame,
+      auxiliaryTableIdentifier: TableIdentifier
+  ): Unit = {
+    val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
+    val meta = Scd1BatchProcessor.cdcMetadataColName
+
+    // Project the reconciled microbatch down to just keys + `_cdc_metadata`; 
data columns are
+    // irrelevant for the auxiliary table and should not be persisted.
+    val reducedMicrobatch = reconciledMicrobatchDf
+      .select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*)
+      .as("reducedMicrobatch")
+
+    val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`")
+    val incomingDelete: Column = 
Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata)
+    val incomingUpsert: Column = 
Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)
+
+    val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`")
+    val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata)
+
+    val doKeysMatch = changeArgs.keys
+      .map(k => F.col(s"reducedMicrobatch.${k.quoted}") === 
F.col(s"$auxIdentQuoted.${k.quoted}"))
+      .reduce(_ && _)
+
+    val incomingRowRepresentsDeleteEvent =
+      incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete > 
incomingUpsert)
+
+    reducedMicrobatch
+      .mergeInto(auxIdentQuoted, doKeysMatch)
+      // Incoming delete is newer than the stored one: advance the high-water 
mark.
+      .whenMatched(
+        incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete
+      )
+      .update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata))
+      // Incoming upsert is newer than the stored delete: the key was 
re-inserted after the
+      // delete, so the aux tombstone is stale - remove it to prevent 
unbounded growth.
+      .whenMatched(
+        !incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete
+      )
+      .delete()
+      // New delete for a key not yet tracked, add it to auxiliary table. Note 
that in the
+      // reconciled microbatch, there is at most one event for key, which 
represents the latest
+      // known event for the key. If the latest known event is a delete, it 
must be a tombstone.
+      .whenNotMatched(incomingRowRepresentsDeleteEvent)
+      .insertAll()
+      .merge()
+  }
+
+  /**
+   * Merge the reconciled (deduplicated, tombstone applied, and column 
selection + metadata
+   * column projected) microbatch onto the target table, as per SCD1 semantics.
+   *
+   * Microbatch invariants:
+   *   - Exactly one of {upsert, delete} version is non-null, the other is 
null.
+   *   - There is at most one event per key, representing the latest known 
event for the key
+   *     across the microbatch and auxiliary table.
+   *
+   * Target table invariants:
+   *   - Target table only contains live rows; delete sequence is always null, 
upsert sequence
+   *     is always non-null.
+   *
+   * @param reconciledMicrobatchDf The reconciled microbatch dataframe.
+   * @param targetTableIdentifier  The identifier of the target table.
+   */
+  private[autocdc] def mergeMicrobatchOntoTarget(
+      reconciledMicrobatchDf: DataFrame,
+      targetTableIdentifier: TableIdentifier
+  ): Unit = {
+    val meta = Scd1BatchProcessor.cdcMetadataColName
+
+    val destinationTableStr = targetTableIdentifier.quotedString
+    // (Re-)alias the reconciled microbatch DF for easy reference for the 
remainder of the merge.
+    val microbatchDf = reconciledMicrobatchDf.as("microbatch")
+
+    val microbatchCdcMetadataCol = F.col(s"microbatch.`$meta`")
+    val destinationCdcMetadataCol =
+      F.col(s"$destinationTableStr.`$meta`")
+
+    val microbatchDeleteVersionField =
+      Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadataCol)
+    val microbatchUpsertVersionField =
+      Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadataCol)
+    val destinationUpsertVersionField =
+      Scd1BatchProcessor.upsertSequenceOf(destinationCdcMetadataCol)
+
+    val keysMatch = changeArgs.keys
+      .map(k =>
+        F.col(s"microbatch.${k.quoted}") === 
F.col(s"$destinationTableStr.${k.quoted}")
+      )
+      .reduce(_ && _)
+
+    // Upsert beats existing row if incoming upsert sequence is geq to the 
upsert sequence on
+    // the target.
+    val incomingWinsUpsert = microbatchUpsertVersionField.isNotNull &&
+      microbatchUpsertVersionField >= destinationUpsertVersionField
+
+    // Delete beats existing row if delete sequencing is strictly greater than 
the upsert
+    // sequence on the target. This is an arbitrary but deliberate choice to 
maintain that
+    // upserts get priority over deletes on duplicate sequencing.
+    val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
+      microbatchDeleteVersionField > destinationUpsertVersionField
+
+    // When the incoming upsert wins against an existing record, the entire 
row (all columns)
+    // will be overwritten, including the CDC metadata column. We only exclude 
keys because
+    // most merge implementations require that join columns are not being 
mutated, even if
+    // the mutation is a no-op.
+    val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
+    val keyNames = changeArgs.keys.map(_.name)
+    val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
+      microbatchDf.columns
+        .filterNot(c => keyNames.exists(resolver(_, c)))
+        .map { c =>
+          val quotedCol = QuotingUtils.quoteIdentifier(c)
+          s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
+        }
+        .toMap
+
+    microbatchDf
+      .mergeInto(destinationTableStr, keysMatch)
+      .whenMatched(incomingWinsDelete)
+      .delete()
+      .whenMatched(incomingWinsUpsert)
+      .update(columnsToUpdateWhenIncomingWinsUpsert)
+      // New key: only insert upserts; deletes for absent keys are no-ops for 
the target table
+      // merge, and instead would have been inserted as tombstones into the 
auxiliary table.
+      .whenNotMatched(microbatchDeleteVersionField.isNull)
+      .insertAll()
+      .merge()
+  }
+
   private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit 
= {
     val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
     val resolver = microbatchSqlConf.resolver
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala
new file mode 100644
index 000000000000..c286f26c8263
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+
+/**
+ * Exposes an API to execute one SCD Type 1 AutoCDC microbatch reconciliation 
on a
+ * foreachBatch streaming query.
+ */
+case class Scd1ForeachBatchHandler(
+    batchProcessor: Scd1BatchProcessor,
+    auxiliaryTableIdentifier: TableIdentifier,
+    targetTableIdentifier: TableIdentifier) {
+
+  /**
+   * Process a single CDC microbatch and merge it into the auxiliary and 
target tables.
+   *
+   * Idempotent under same-`batchId` replay: both merges are gated on sequence 
inequalities,
+   * so a partial failure between them is reconciled correctly when 
foreachBatch retries the
+   * whole batch.
+   */
+  def execute(batchDf: DataFrame, batchId: Long): Unit = {
+    ScdBatchValidator(
+      destinationIdentifier = targetTableIdentifier,
+      changeArgs = batchProcessor.changeArgs,
+      batchDf = batchDf,
+      batchId = batchId
+    ).validateMicrobatch()
+
+    val reconciledMicrobatch = batchProcessor.reconcileMicrobatch(
+      batchDf = batchDf,
+      // Aux holds at most one row per currently-active tombstone (revived 
keys are GC'd
+      // by mergeMicrobatchOntoAuxiliaryTable), so it generally stays small 
enough for a broadcast
+      // join. Future optimizations: key-pruned reads, table format-aware 
clustering and tombstone
+      // TTL.
+      auxiliaryTableDf = batchDf.sparkSession.read.table(
+        auxiliaryTableIdentifier.quotedString
+      )
+    )
+
+    batchProcessor.mergeMicrobatchOntoAuxiliaryTable(
+      reconciledMicrobatchDf = reconciledMicrobatch,
+      auxiliaryTableIdentifier = auxiliaryTableIdentifier
+    )
+
+    // Failure between these two merges is safe under foreachBatch retry: the 
aux merge
+    // only ever mutates a tombstone when this batch's event makes it stale 
(strictly newer
+    // delete advances it) or redundant (`>=` upsert revives the key, GC'ing 
the tombstone),
+    // so on retry those preconditions no longer hold against the 
just-advanced aux state -
+    // the aux merge is a no-op and the target merge replays as if for the 
first time.
+    batchProcessor.mergeMicrobatchOntoTarget(
+      reconciledMicrobatchDf = reconciledMicrobatch,
+      targetTableIdentifier = targetTableIdentifier
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ScdBatchValidator.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ScdBatchValidator.scala
new file mode 100644
index 000000000000..0d2f47d1c4a6
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ScdBatchValidator.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.RowOrdering
+import org.apache.spark.sql.classic.DataFrame
+
+/**
+ * Per-microbatch input validation shared by SCD merge executors. Throws with 
a clear,
+ * user-actionable error if the batch violates the CDC contract.
+ *
+ * @param destinationIdentifier The identifier of the target table, used for 
error messages.
+ * @param changeArgs The user-specified AutoCDC parameters.
+ * @param batchDf The incoming microbatch to validate.
+ * @param batchId The structured-streaming batch id, used for error messages.
+ */
+case class ScdBatchValidator(
+    destinationIdentifier: TableIdentifier,
+    changeArgs: ChangeArgs,
+    batchDf: DataFrame,
+    batchId: Long) {
+
+  /**
+   * Validates that the sequencing column is orderable and that no row has a 
null sequencing
+   * value or a null value in any key column. The per-row checks are folded 
into a single
+   * aggregation so the microbatch is scanned exactly once.
+   */
+  def validateMicrobatch(): Unit = {
+    val seqType = batchDf.select(changeArgs.sequencing).schema.head.dataType
+    if (!RowOrdering.isOrderable(seqType)) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_MICROBATCH_VALIDATION.NON_ORDERABLE_SEQUENCE",
+        messageParameters = Map(
+          "tableName" -> destinationIdentifier.quotedString,
+          "batchId" -> batchId.toString,
+          "dataType" -> seqType.catalogString
+        )
+      )
+    }
+
+    val sequencingNullCount: Column =
+      F.count(F.when(changeArgs.sequencing.isNull, 
F.lit(1))).as("__autocdc_seq_null_count")
+    val perKeyNullCount: Seq[Column] = changeArgs.keys.map { key =>
+      F.count(F.when(F.col(key.quoted).isNull, F.lit(1)))
+        .as(s"__autocdc_key_null_count_${key.name}")
+    }
+    // The null count aggregations are laid out in the returned dataframe as:
+    // [# rows with null sequence, # rows with null for key1, ..., # rows with 
null for keyN].
+    val nullCountsResultDf =
+      batchDf.agg(sequencingNullCount, perKeyNullCount: _*).head()
+
+    val numRowsWithNullSequence = nullCountsResultDf.getLong(0)
+    if (numRowsWithNullSequence > 0) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_MICROBATCH_VALIDATION.NULL_SEQUENCE",
+        messageParameters = Map(
+          "tableName" -> destinationIdentifier.quotedString,
+          "batchId" -> batchId.toString,
+          "nullCount" -> numRowsWithNullSequence.toString
+        )
+      )
+    }
+
+    val keysWithNullEntries = changeArgs.keys.zipWithIndex.flatMap { case 
(key, idx) =>
+      val rowCountForKey = nullCountsResultDf.getLong(idx + 1)
+      Option.when(rowCountForKey > 0)(key -> rowCountForKey)
+    }
+    if (keysWithNullEntries.nonEmpty) {
+      val nullKeyCounts = keysWithNullEntries
+        .map { case (key, count) => s"${key.quoted}=$count" }
+        .mkString(", ")
+
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_MICROBATCH_VALIDATION.NULL_KEY",
+        messageParameters = Map(
+          "tableName" -> destinationIdentifier.quotedString,
+          "batchId" -> batchId.toString,
+          "nullKeyCounts" -> nullKeyCounts
+        )
+      )
+    }
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
new file mode 100644
index 000000000000..0dc0a9027660
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.connector.catalog.{Identifier, TableInfo}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import 
org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, LongType, StructType}
+
+/**
+ * Shared test infrastructure for AutoCDC suites that exercise execution paths 
performing
+ * v2 `MERGE INTO` operations against an in-memory catalog. Provides:
+ *
+ *   - A pre-configured [[InMemoryRowLevelOperationTableCatalog]] registered 
before each test
+ *     and reset after each test.
+ *   - Stable v2 [[Identifier]] and Catalyst [[TableIdentifier]] values for an 
auxiliary table
+ *     and a target table.
+ *   - Schema-agnostic primitives: table creation, microbatch [[DataFrame]] 
construction,
+ *     and CDC metadata helpers parameterized by sequencing type.
+ *
+ * Suites that mix this in are responsible for defining their own schemas 
(auxiliary, target,
+ * source) and processor / exec instances, then writing thin wrappers around 
[[createTable]]
+ * to seed those schemas.
+ */
+trait AutoCdcCatalogExecutionTestBase {
+  this: SharedSparkSession with BeforeAndAfter =>
+
+  protected val catalogName: String = "cat"
+  protected val namespace: String = "ns1"
+  protected val auxTableName: String = "aux_table"
+  protected val targetTableName: String = "target_table"
+
+  /** Default DSv2 [[Identifier]] for the auxiliary table. */
+  protected val defaultAuxIdent: Identifier = Identifier.of(Array(namespace), 
auxTableName)
+  /** Default DSv2 [[Identifier]] for the target table. */
+  protected val defaultTargetIdent: Identifier = 
Identifier.of(Array(namespace), targetTableName)
+
+  /** Default catalyst three-part [[TableIdentifier]] for the auxiliary table. 
*/
+  protected val defaultAuxTableIdentifier: TableIdentifier = TableIdentifier(
+    table = auxTableName,
+    database = Some(namespace),
+    catalog = Some(catalogName)
+  )
+  /** Default catalyst three-part [[TableIdentifier]] for the target table. */
+  protected val defaultTargetTableIdentifier: TableIdentifier = 
TableIdentifier(
+    table = targetTableName,
+    database = Some(namespace),
+    catalog = Some(catalogName)
+  )
+
+  before {
+    spark.conf.set(
+      s"spark.sql.catalog.$catalogName",
+      classOf[InMemoryRowLevelOperationTableCatalog].getName
+    )
+  }
+
+  after {
+    spark.sessionState.catalogManager.reset()
+    spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$catalogName")
+  }
+
+  /**
+   * Schema of the [[Scd1BatchProcessor.cdcMetadataColName]] struct column for 
a given
+   * sequencing column type. Defaults to [[LongType]] because all current SCD1 
tests use
+   * `Long` sequencing.
+   */
+  protected def cdcMetadataColSchemaType(sequencingType: DataType = LongType): 
StructType =
+    new StructType()
+      .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, sequencingType)
+      .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, sequencingType)
+
+  /**
+   * Build a [[Row]] matching the [[Scd1BatchProcessor.cdcMetadataColName]] 
struct's two fields,
+   * in the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]:
+   */
+  protected def cdcMetadataRow[T](deleteSeq: Option[T], upsertSeq: Option[T]): 
Row =
+    Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null))
+
+  /**
+   * Create a table in the test catalog under the given DSv2 [[Identifier]] 
using `schema`,
+   * optionally seeding it with `seedRows`. Pass no rows to create an empty 
table.
+   */
+  protected def createTable(
+      ident: Identifier,
+      tableIdentifier: TableIdentifier,
+      schema: StructType,
+      seedRows: Row*): Unit = {
+    spark.sessionState.catalogManager
+      .catalog(catalogName)
+      .asTableCatalog
+      .createTable(ident, new TableInfo.Builder().withSchema(schema).build())
+
+    if (seedRows.nonEmpty) {
+      microbatchOf(schema)(seedRows: 
_*).writeTo(tableIdentifier.quotedString).append()
+    }
+  }
+
+  /** Build a microbatch [[DataFrame]] from explicit `rows` and an explicit 
`schema`. */
+  protected def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
+    spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
new file mode 100644
index 000000000000..475d25f5aa2c
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Row}
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Tests for [[Scd1BatchProcessor]] methods that perform a `MERGE INTO` 
against a registered
+ * v2 table. These tests require a v2 catalog that supports row-level 
operations
+ * (set up by [[AutoCdcCatalogExecutionTestBase]]) and run actual writes 
through Catalyst's
+ * row-level-operations machinery, so they are kept separate from the 
pure-DataFrame-transform
+ * tests in [[Scd1BatchProcessorSuite]].
+ */
+class Scd1BatchProcessorMergeSuite
+    extends QueryTest
+    with SharedSparkSession
+    with BeforeAndAfter
+    with AutoCdcCatalogExecutionTestBase {
+
+  /**
+   * Minimal valid shape for both the auxiliary table and microbatch inputs in 
these tests:
+   * a single key column `id` plus the CDC metadata struct. The auxiliary 
table genuinely
+   * has only this shape in production, and the merge function reduces its 
microbatch input
+   * down to keys + `_cdc_metadata` regardless of incoming data columns -- so 
most tests can
+   * use this single schema for both ends.
+   */
+  private val minimalSchema: StructType = new StructType()
+    .add("id", IntegerType)
+    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+  /** Minimal target-table shape: one key, one data column, and CDC metadata. 
*/
+  private val targetSchema: StructType = new StructType()
+    .add("id", IntegerType)
+    .add("value", StringType)
+    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+  /**
+   * A processor with a single key column `id`. `sequencing` is irrelevant for
+   * merge functions in this suite: they operate entirely on the 
already-computed CDC metadata
+   * column, never on the raw sequencing expression.
+   */
+  private val processor = Scd1BatchProcessor(
+    changeArgs = ChangeArgs(
+      keys = Seq(UnqualifiedColumnName("id")),
+      sequencing = F.lit(0L),
+      storedAsScdType = ScdType.Type1
+    ),
+    resolvedSequencingType = LongType
+  )
+
+  /** Create the auxiliary table using [[minimalSchema]], optionally seeded 
with `seedRows`. */
+  private def createAuxTable(seedRows: Row*): Unit =
+    createTable(defaultAuxIdent, defaultAuxTableIdentifier, minimalSchema, 
seedRows: _*)
+
+  /** Create the target table using [[targetSchema]], optionally seeded with 
`seedRows`. */
+  private def createTargetTable(seedRows: Row*): Unit =
+    createTable(defaultTargetIdent, defaultTargetTableIdentifier, 
targetSchema, seedRows: _*)
+
+  /**
+   * Build an auxiliary-table schema with the given key columns followed by 
the standard CDC
+   * metadata struct. Used by tests that need a non-trivial key shape 
(composite or dotted).
+   */
+  private def customKeyAuxSchema(keyColumns: Seq[(String, DataType)]): 
StructType = {
+    val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name, 
dt)) =>
+      s.add(name, dt)
+    }
+    withKeys.add(Scd1BatchProcessor.cdcMetadataColName, 
cdcMetadataColSchemaType())
+  }
+
+  /**
+   * Create the auxiliary table at [[defaultAuxIdent]] using `schema` and 
optionally seed it
+   * with `seedRows`. Used by tests that need a non-trivial key shape 
(composite or dotted).
+   */
+  private def createAuxTableWithSchema(schema: StructType, seedRows: Row*): 
Unit =
+    createTable(defaultAuxIdent, defaultAuxTableIdentifier, schema, seedRows: 
_*)
+
+  /**
+   * `(name, dataType)` pairs of `schema`'s fields, used to compare two 
schemas for structural
+   * equivalence while deliberately ignoring nullability and metadata.
+   */
+  private def columnNamesAndDataTypes(schema: StructType): Seq[(String, 
DataType)] =
+    schema.fields.map(f => (f.name, f.dataType)).toSeq
+
+  // =============== mergeMicrobatchOntoAuxiliaryTable tests ===============
+
+  test("mergeMicrobatchOntoAuxiliaryTable replaces an existing tombstone with 
a newer " +
+    "microbatch tombstone, dropping any microbatch-only data columns") {
+    createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)))
+
+    // The microbatch carries an extra `value` data column that has no place 
in the auxiliary
+    // table. mergeMicrobatchOntoAuxiliaryTable must project it away before 
merging, both to
+    // satisfy MergeIntoTable's schema requirements and to keep the auxiliary 
table free of
+    // unrelated columns.
+    val microbatchSchema = new StructType()
+      .add("id", IntegerType)
+      .add("value", StringType)
+      .add(
+        Scd1BatchProcessor.cdcMetadataColName,
+        new StructType()
+          .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+          .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
+      )
+    val microbatch = microbatchOf(microbatchSchema)(
+      Row(1, "data-leak", cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = 
None))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    // Tombstone advanced to delete=20, with exactly one row per key (no 
duplicate tombstones).
+    checkAnswer(resultAuxTable, Row(1, Row(20L, null)))
+    // Schema strictly matches minimalSchema; the `value` column was dropped, 
not smuggled in.
+    assert(columnNamesAndDataTypes(resultAuxTable.schema) == 
columnNamesAndDataTypes(minimalSchema))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable deletes an existing tombstone when 
superseded by a " +
+    "newer microbatch upsert") {
+    createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)))
+
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    assert(resultAuxTable.collect().isEmpty)
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable inserts a new tombstone for a 
previously-untracked " +
+    "key") {
+    createAuxTable()
+
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable leaves rows for unrelated keys 
untouched") {
+    createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)))
+
+    // Microbatch event affects a different key entirely; the existing 
tombstone for id=1 must
+    // not be touched even though the new tombstone's sequence is much larger.
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(2, cdcMetadataRow(deleteSeq = Some(100L), upsertSeq = None))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Seq(
+      Row(1, Row(10L, null)),
+      Row(2, Row(100L, null))
+    ))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch deletes whose 
sequence is older " +
+    "than the existing tombstone") {
+    // This documents that mergeMicrobatchOntoAuxiliaryTable's contract is 
stronger than just
+    // relying on applyTombstonesToMicrobatch having filtered out stale events 
upstream: even
+    // an unfiltered stale incoming delete must not regress the high-water 
mark.
+    createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)))
+
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(5L), upsertSeq = None))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch upserts whose 
sequence is older " +
+    "than the existing tombstone") {
+    createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)))
+
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5L)))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable applies the tied-sequence asymmetry: 
equal deletes " +
+    "are kept, equal upserts delete the tombstone") {
+    // On a delete<->upsert sequencing tie, upsert events are given priority 
over deletes;
+    // therefore an incoming upsert with the same sequence as a tombstone 
should delete the
+    // tombstone. On a delete<->delete sequencing tie, the effect is a no-op. 
This is an
+    // internal SCD1 tie-breaking convention, not a publicly documented 
contract.
+    createAuxTable(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None)),
+      Row(2, cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = None))
+    )
+
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None)),
+      Row(2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    // Row 1's tombstone remains the same, but row 2's tombstone should be 
marked as stale and
+    // deleted.
+    checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable upsert event for different key does 
not affect " +
+    "tombstone") {
+    createAuxTable(Row(2, cdcMetadataRow(deleteSeq = Some(5L), upsertSeq = 
None)))
+
+    val microbatch = microbatchOf(minimalSchema)(
+      // Although the upsert seq is 10, this is for key=1; tombstone for key=2 
should be unaffected.
+      Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10L)))
+    )
+
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Row(2, Row(5L, null)))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable is idempotent across a microbatch 
that exercises " +
+    "every merge clause") {
+    // The auxiliary table starts with three tombstones; the microbatch then 
exercises every
+    // merge clause simultaneously:
+    //   - id=1: aux tombstone superseded by a microbatch upsert
+    //   - id=2: aux tombstone advanced by a newer microbatch delete
+    //   - id=3: untouched by the microbatch
+    //   - id=4: new tombstone for an untracked key
+    createAuxTable(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None)),
+      Row(2, cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = None)),
+      Row(3, cdcMetadataRow(deleteSeq = Some(30L), upsertSeq = None))
+    )
+
+    val microbatch = microbatchOf(minimalSchema)(
+      Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15L))),
+      Row(2, cdcMetadataRow(deleteSeq = Some(25L), upsertSeq = None)),
+      Row(4, cdcMetadataRow(deleteSeq = Some(40L), upsertSeq = None))
+    )
+
+    val expectedAfterMerge = Seq(
+      Row(2, Row(25L, null)),
+      Row(3, Row(30L, null)),
+      Row(4, Row(40L, null))
+    )
+
+    // First merge applies all three clauses exactly once.
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+    val auxTableAfterFirstMerge = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(auxTableAfterFirstMerge, expectedAfterMerge)
+
+    // Re-applying the same microbatch is a no-op:
+    //   - id=1 is absent from aux; whenNotMatched is gated on delete events 
=> skipped.
+    //   - id=2 has tied delete (incoming==aux); strict `>` in the update 
clause fails.
+    //   - id=4 has tied delete (incoming==aux); same reason.
+    processor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+    val auxTableAfterSecondMerge = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    checkAnswer(auxTableAfterSecondMerge, expectedAfterMerge)
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable correctly inserts tombstones for 
composite key") {
+    // Composite key: (region, customer_id). The merge join condition is the 
AND of every key
+    // column equality, so an aux row sharing only `region` with the 
microbatch must NOT be
+    // touched, while the microbatch row must be inserted as a new tombstone.
+    val compositeSchema = customKeyAuxSchema(Seq(
+      "region" -> StringType,
+      "customer_id" -> IntegerType
+    ))
+    createAuxTableWithSchema(
+      compositeSchema,
+      Row("US", 99, cdcMetadataRow(deleteSeq = Some(50L), upsertSeq = None))
+    )
+
+    val compositeKeyProcessor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("region"), 
UnqualifiedColumnName("customer_id")),
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val microbatch = microbatchOf(compositeSchema)(
+      Row("US", 1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None))
+    )
+
+    compositeKeyProcessor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    checkAnswer(spark.read.table(defaultAuxTableIdentifier.quotedString), Seq(
+      Row("US", 99, Row(50L, null)),
+      Row("US", 1, Row(10L, null))
+    ))
+  }
+
+  test("mergeMicrobatchOntoAuxiliaryTable correctly merges for 
backticked/dotted keys") {
+    // Even though the column is a backticked identifier in user-facing DDL, 
Spark drops the
+    // backticks during schema resolution so the field name is the literal 
`user.id`. The merge
+    // path must propagate the user's quoted identifier through `k.quoted` so 
the join condition
+    // and update target both resolve to the same physical column.
+    val dottedKeySchema = customKeyAuxSchema(Seq("user.id" -> IntegerType))
+    createAuxTableWithSchema(
+      dottedKeySchema,
+      Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None))
+    )
+
+    val dottedKeyProcessor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("`user.id`")),
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    // We expect the existing tombstone with del seq=10 to be advanced to 20 
if the merge matches
+    // dotted keys correctly.
+    val microbatch = microbatchOf(dottedKeySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = None))
+    )
+
+    dottedKeyProcessor.mergeMicrobatchOntoAuxiliaryTable(microbatch, 
defaultAuxTableIdentifier)
+
+    checkAnswer(spark.read.table(defaultAuxTableIdentifier.quotedString), 
Row(1, Row(20L, null)))
+  }
+
+  // =============== mergeMicrobatchOntoTarget tests ===============
+
+  test("mergeMicrobatchOntoTarget updates an existing row with a newer 
upsert") {
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    val microbatch = microbatchOf(targetSchema)(
+      Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+    )
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "new", Row(null, 20L)))
+    assert(columnNamesAndDataTypes(resultTargetTable.schema) ==
+      columnNamesAndDataTypes(targetSchema))
+  }
+
+  test("mergeMicrobatchOntoTarget deletes an existing row with a newer 
delete") {
+    createTargetTable(
+      Row(1, "delete-me", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(10L))),
+      Row(2, "keep-me", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(20L)))
+    )
+
+    val microbatch = microbatchOf(targetSchema)(
+      Row(1, "unused", cdcMetadataRow(deleteSeq = Some(15L), upsertSeq = None))
+    )
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(2, "keep-me", Row(null, 20L)))
+  }
+
+  test("mergeMicrobatchOntoTarget inserts new upserts but not new (tombstone) 
deletes") {
+    createTargetTable(Row(1, "existing", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(10L))))
+
+    val microbatch = microbatchOf(targetSchema)(
+      Row(2, "insert-me", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(20L))),
+      Row(3, "do-not-insert", cdcMetadataRow(deleteSeq = Some(30L), upsertSeq 
= None))
+    )
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Seq(
+      Row(1, "existing", Row(null, 10L)),
+      Row(2, "insert-me", Row(null, 20L))
+    ))
+  }
+
+  test("mergeMicrobatchOntoTarget ignores stale upserts and stale deletes") {
+    createTargetTable(
+      Row(1, "target-delete-tie", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(10L))),
+      Row(2, "target-newer", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(20L)))
+    )
+
+    val microbatch = microbatchOf(targetSchema)(
+      Row(1, "delete-tie", cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)),
+      Row(2, "older-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(15L)))
+    )
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Seq(
+      Row(1, "target-delete-tie", Row(null, 10L)),
+      Row(2, "target-newer", Row(null, 20L))
+    ))
+  }
+
+  test("mergeMicrobatchOntoTarget gives tied upserts priority over the target 
row") {
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    val microbatch = microbatchOf(targetSchema)(
+      Row(1, "same-sequence-upsert", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(10L)))
+    )
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "same-sequence-upsert", Row(null, 
10L)))
+  }
+
+  test("mergeMicrobatchOntoTarget correctly matches escaped key column names") 
{
+    // The raw key name contains special characters that would require being 
escaped on name
+    // resolution.
+    val rawKeyName = "a`b"
+    val schemaWithSpecialKeyCharacters = new StructType()
+      // The schema always stores the backtick consumed column name, so 
unticked the raw name here.
+      .add(rawKeyName, IntegerType)
+      .add("value", StringType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+    createTable(
+      defaultTargetIdent,
+      defaultTargetTableIdentifier,
+      schemaWithSpecialKeyCharacters,
+      Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10L)))
+    )
+
+    val processorForCustomKeySchema = processor.copy(
+      changeArgs = processor.changeArgs.copy(
+        keys = 
Seq(UnqualifiedColumnName(QuotingUtils.quoteIdentifier(rawKeyName)))
+      )
+    )
+    val microbatch = microbatchOf(schemaWithSpecialKeyCharacters)(
+      Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+    )
+
+    processorForCustomKeySchema.mergeMicrobatchOntoTarget(
+      microbatch,
+      defaultTargetTableIdentifier
+    )
+
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "new", Row(null, 20L)))
+  }
+
+  gridTest(
+    "mergeMicrobatchOntoTarget key column comparison respects spark session 
case sensitivity"
+  )(Seq(false, true)) { caseSensitive =>
+    withSQLConf("spark.sql.caseSensitive" -> caseSensitive.toString) {
+      createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(10L))))
+
+      val processorWithUpperCaseKey = processor.copy(
+        changeArgs = processor.changeArgs.copy(
+          keys = Seq(UnqualifiedColumnName("ID"))
+        )
+      )
+
+      val microbatch = microbatchOf(targetSchema)(
+        Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+      )
+
+      if (caseSensitive) {
+        val ex = intercept[AnalysisException] {
+          processorWithUpperCaseKey.mergeMicrobatchOntoTarget(
+            microbatch,
+            defaultTargetTableIdentifier
+          )
+        }
+        // Intentionally not using checkError here, to avoid asserting on a 
brittle query context
+        // and long message parmeters list.
+        assert(ex.errorClass.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"))
+      } else {
+        processorWithUpperCaseKey.mergeMicrobatchOntoTarget(
+          microbatch,
+          defaultTargetTableIdentifier
+        )
+        val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+        checkAnswer(resultTargetTable, Row(1, "new", Row(null, 20L)))
+      }
+    }
+  }
+
+  test("mergeMicrobatchOntoTarget is idempotent across a microbatch") {
+    createTargetTable(
+      Row(1, "delete-me", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(10L))),
+      Row(2, "update-me", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(20L))),
+      Row(3, "untouched", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(30L)))
+    )
+
+    val microbatch = microbatchOf(targetSchema)(
+      Row(1, "delete-event", cdcMetadataRow(deleteSeq = Some(15L), upsertSeq = 
None)),
+      Row(2, "updated", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(25L))),
+      Row(4, "inserted", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(40L))),
+      Row(5, "absent-delete", cdcMetadataRow(deleteSeq = Some(50L), upsertSeq 
= None))
+    )
+
+    val expectedAfterMerge = Seq(
+      Row(2, "updated", Row(null, 25L)),
+      Row(3, "untouched", Row(null, 30L)),
+      Row(4, "inserted", Row(null, 40L))
+    )
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+    val targetTableAfterFirstMerge = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(targetTableAfterFirstMerge, expectedAfterMerge)
+
+    processor.mergeMicrobatchOntoTarget(microbatch, 
defaultTargetTableIdentifier)
+    val targetTableAfterSecondMerge = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(targetTableAfterSecondMerge, expectedAfterMerge)
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
new file mode 100644
index 000000000000..76790847ede5
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * E2E unit tests for the Scd1ForeachBatchHandler class.
+ */
+class Scd1ForeachBatchHandlerSuite
+    extends QueryTest
+    with SharedSparkSession
+    with BeforeAndAfter
+    with AutoCdcCatalogExecutionTestBase {
+
+  private val sourceSchema = new StructType()
+    .add("id", IntegerType)
+    .add("value", StringType)
+    .add("seq", LongType)
+    .add("is_delete", BooleanType)
+
+  private val auxiliarySchema = new StructType()
+    .add("id", IntegerType)
+    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+  private val targetSchema = new StructType()
+    .add("id", IntegerType)
+    .add("value", StringType)
+    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+  private val processor = Scd1BatchProcessor(
+    changeArgs = ChangeArgs(
+      keys = Seq(UnqualifiedColumnName("id")),
+      sequencing = F.col("seq"),
+      storedAsScdType = ScdType.Type1,
+      deleteCondition = Some(F.col("is_delete")),
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("value"))
+        )
+      )
+    ),
+    resolvedSequencingType = LongType
+  )
+
+  /** Create the auxiliary table using [[auxiliarySchema]], optionally seeded 
with `seedRows`. */
+  private def createAuxTable(seedRows: Row*): Unit =
+    createTable(defaultAuxIdent, defaultAuxTableIdentifier, auxiliarySchema, 
seedRows: _*)
+
+  /** Create the target table using [[targetSchema]], optionally seeded with 
`seedRows`. */
+  private def createTargetTable(seedRows: Row*): Unit =
+    createTable(defaultTargetIdent, defaultTargetTableIdentifier, 
targetSchema, seedRows: _*)
+
+  private def exec: Scd1ForeachBatchHandler = Scd1ForeachBatchHandler(
+    batchProcessor = processor,
+    auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+    targetTableIdentifier = defaultTargetTableIdentifier
+  )
+
+  // 
===========================================================================================
+  // Microbatch validation tests
+  // 
===========================================================================================
+
+  test(
+    "Scd1ForeachBatchHandler invalidates rows with null sequencing before 
merging to aux/target " +
+    "tables."
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    val batch = microbatchOf(sourceSchema)(
+      Row(1, "invalid", null, false)
+    )
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        exec.execute(batch, batchId = 123L)
+      },
+      condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_SEQUENCE",
+      sqlState = "22000",
+      parameters = Map(
+        "tableName" -> defaultTargetTableIdentifier.quotedString,
+        "batchId" -> "123",
+        "nullCount" -> "1"
+      )
+    )
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(resultAuxTable.collect().isEmpty)
+    checkAnswer(resultTargetTable, Row(1, "old", Row(null, 10L)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler invalidates rows with a null key column before 
merging to aux/target"
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    // Two rows have a null id; one row is well-formed. The validator must 
surface a count
+    // of two without writing anything to the aux or target table.
+    val batch = microbatchOf(sourceSchema)(
+      Row(null, "no-id-1", 5L, false),
+      Row(2, "ok", 6L, false),
+      Row(null, "no-id-2", 7L, true)
+    )
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        exec.execute(batch, batchId = 13L)
+      },
+      condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_KEY",
+      sqlState = "22000",
+      parameters = Map(
+        "tableName" -> defaultTargetTableIdentifier.quotedString,
+        "batchId" -> "13",
+        "nullKeyCounts" -> "`id`=2"
+      )
+    )
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(resultAuxTable.collect().isEmpty)
+    checkAnswer(resultTargetTable, Row(1, "old", Row(null, 10L)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler invalidates rows when any column of a composite 
key is null"
+  ) {
+    // Composite [country, city] key. The validator must report per-column 
null counts in
+    // the configured key order (country before city).
+    val compositeSourceSchema = new StructType()
+      .add("country", StringType)
+      .add("city", StringType)
+      .add("seq", LongType)
+      .add("is_delete", BooleanType)
+    val compositeAuxSchema = new StructType()
+      .add("country", StringType)
+      .add("city", StringType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+    val compositeTargetSchema = new StructType()
+      .add("country", StringType)
+      .add("city", StringType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+    val compositeProcessor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("country"), 
UnqualifiedColumnName("city")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        deleteCondition = Some(F.col("is_delete"))
+      ),
+      resolvedSequencingType = LongType
+    )
+    val compositeExec = Scd1ForeachBatchHandler(
+      batchProcessor = compositeProcessor,
+      auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+      targetTableIdentifier = defaultTargetTableIdentifier
+    )
+
+    createTable(defaultAuxIdent, defaultAuxTableIdentifier, compositeAuxSchema)
+    createTable(defaultTargetIdent, defaultTargetTableIdentifier, 
compositeTargetSchema)
+
+    // country is null in 2 rows, city is null in 2 rows (one row has both 
null).
+    val batch = microbatchOf(compositeSourceSchema)(
+      Row(null, "Boston", 1L, false),
+      Row("US", null, 2L, false),
+      Row(null, null, 3L, false)
+    )
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        compositeExec.execute(batch, batchId = 7L)
+      },
+      condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_KEY",
+      sqlState = "22000",
+      parameters = Map(
+        "tableName" -> defaultTargetTableIdentifier.quotedString,
+        "batchId" -> "7",
+        "nullKeyCounts" -> "`country`=2, `city`=2"
+      )
+    )
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(resultAuxTable.collect().isEmpty)
+    assert(resultTargetTable.collect().isEmpty)
+  }
+
+  test(
+    "Scd1ForeachBatchHandler surfaces the null-sequence error before the 
null-key error"
+  ) {
+    // A single row has both a null sequence and a null id. The validator must 
surface the
+    // sequence error first to preserve the existing precedence.
+    createAuxTable()
+    createTargetTable()
+
+    val batch = microbatchOf(sourceSchema)(
+      Row(null, "bad", null, false)
+    )
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        exec.execute(batch, batchId = 99L)
+      },
+      condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_SEQUENCE",
+      sqlState = "22000",
+      parameters = Map(
+        "tableName" -> defaultTargetTableIdentifier.quotedString,
+        "batchId" -> "99",
+        "nullCount" -> "1"
+      )
+    )
+  }
+
+  test(
+    "Scd1ForeachBatchHandler validates that the microbatch's sequencing column 
is orderable"
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    val batchSchema = new StructType()
+      .add("id", IntegerType)
+      .add("value", StringType)
+      .add("seq", MapType(StringType, IntegerType))
+      .add("is_delete", BooleanType)
+    val batch = microbatchOf(batchSchema)(
+      Row(1, "invalid", Map("k" -> 1), false)
+    )
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        exec.execute(batch, batchId = 124L)
+      },
+      condition = "AUTOCDC_MICROBATCH_VALIDATION.NON_ORDERABLE_SEQUENCE",
+      sqlState = "22000",
+      parameters = Map(
+        "tableName" -> defaultTargetTableIdentifier.quotedString,
+        "batchId" -> "124",
+        "dataType" -> "map<string,int>"
+      )
+    )
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(resultAuxTable.collect().isEmpty)
+    checkAnswer(resultTargetTable, Row(1, "old", Row(null, 10L)))
+  }
+
+  // 
===========================================================================================
+  // Core SCD1 transformation tests
+  // 
===========================================================================================
+
+  test(
+    "Scd1ForeachBatchHandler drops stale microbatch rows using auxiliary 
tombstones and writes " +
+    "fresh upserts"
+  ) {
+    createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = 
None)))
+    createTargetTable()
+
+    val batch = microbatchOf(sourceSchema)(
+      Row(1, "stale", 5L, false),
+      Row(2, "fresh", 20L, false)
+    )
+
+    exec.execute(batch, batchId = 0L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+    checkAnswer(resultTargetTable, Row(2, "fresh", Row(null, 20L)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler persists a newer delete as a tombstone and 
removes the target row"
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    val batch = microbatchOf(sourceSchema)(
+      Row(1, "unused", 20L, true)
+    )
+
+    exec.execute(batch, batchId = 1L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultAuxTable, Row(1, Row(20L, null)))
+    assert(resultTargetTable.collect().isEmpty)
+  }
+
+  test(
+    "Scd1ForeachBatchHandler deduplicates the raw microbatch before merging 
into the target"
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq 
= Some(10L))))
+
+    val batch = microbatchOf(sourceSchema)(
+      Row(1, "ignored-older", 15L, false),
+      Row(1, "newer", 20L, false)
+    )
+
+    exec.execute(batch, batchId = 2L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(resultAuxTable.collect().isEmpty)
+    checkAnswer(resultTargetTable, Row(1, "newer", Row(null, 20L)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler reconciles out-of-order events when 
ExcludeColumns hides the " +
+    "sequencing column"
+  ) {
+    // ExcludeColumns omits the sequencing column ("seq") and the delete 
marker ("is_delete")
+    // from persisted rows. The sequencing expression itself still drives CDC 
reconciliation;
+    // this test verifies that several out-of-order events across six batches 
converge to the
+    // correct target state without ever materializing those columns.
+    val excludeProcessor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        deleteCondition = Some(F.col("is_delete")),
+        columnSelection = Some(
+          ColumnSelection.ExcludeColumns(
+            Seq(UnqualifiedColumnName("seq"), 
UnqualifiedColumnName("is_delete"))
+          )
+        )
+      ),
+      resolvedSequencingType = LongType
+    )
+    val excludeExec = Scd1ForeachBatchHandler(
+      batchProcessor = excludeProcessor,
+      auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+      targetTableIdentifier = defaultTargetTableIdentifier
+    )
+
+    createAuxTable()
+    createTargetTable()
+
+    // Batch 1: highest-seq event in the batch wins on insert.
+    excludeExec.execute(
+      microbatchOf(sourceSchema)(
+        Row(1, "alice", 1L, false),
+        Row(1, "bob", 3L, false)
+      ),
+      batchId = 0L
+    )
+
+    // Batch 2: out-of-order older upsert (seq=2) must not overwrite the live 
row at seq=3.
+    excludeExec.execute(microbatchOf(sourceSchema)(Row(1, "carol", 2L, 
false)), batchId = 1L)
+
+    // Batch 3: even-newer upsert wins.
+    excludeExec.execute(microbatchOf(sourceSchema)(Row(1, "dave", 4L, false)), 
batchId = 2L)
+
+    // Batch 4: out-of-order older delete (seq=2) must not erase the live row 
at seq=4.
+    excludeExec.execute(microbatchOf(sourceSchema)(Row(1, null, 2L, true)), 
batchId = 3L)
+
+    val targetTableAfterBatch4 = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(targetTableAfterBatch4, Row(1, "dave", Row(null, 4L)))
+
+    // Batch 5: newer delete (seq=5) wipes the row from the target.
+    excludeExec.execute(microbatchOf(sourceSchema)(Row(1, null, 5L, true)), 
batchId = 4L)
+
+    // Batch 6: out-of-order pre-delete upsert (seq=4) is suppressed by the 
tombstone.
+    excludeExec.execute(microbatchOf(sourceSchema)(Row(1, "ghost", 4L, 
false)), batchId = 5L)
+
+    val auxTableAfterBatch6 = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val targetTableAfterBatch6 = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(targetTableAfterBatch6.collect().isEmpty)
+    checkAnswer(auxTableAfterBatch6, Row(1, Row(5L, null)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler upserts an existing target row when a 
higher-sequenced event arrives"
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(1L))))
+
+    exec.execute(microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)), batchId 
= 0L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "bob", Row(null, 2L)))
+    assert(resultAuxTable.collect().isEmpty)
+  }
+
+  test(
+    "Scd1ForeachBatchHandler records an aux tombstone for a delete on a 
nonexistent key without" +
+      " affecting the target"
+  ) {
+    // A delete event for a key that never existed in the target must still be 
recorded in
+    // the auxiliary table, because a strictly older upsert for the same key 
arriving in a
+    // later batch must be suppressed by that tombstone.
+    createAuxTable()
+    createTargetTable()
+
+    exec.execute(microbatchOf(sourceSchema)(Row(99, null, 1L, true)), batchId 
= 0L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    assert(resultTargetTable.collect().isEmpty)
+    checkAnswer(resultAuxTable, Row(99, Row(1L, null)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler ignores a late-arriving upsert with a sequence 
below the target's" +
+      " last upsert"
+  ) {
+    createAuxTable()
+    createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(5L))))
+
+    exec.execute(microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)), batchId 
= 0L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "alice", Row(null, 5L)))
+    assert(resultAuxTable.collect().isEmpty)
+  }
+
+  test(
+    "Scd1ForeachBatchHandler ignores a late-arriving lower-seq delete but 
still records the aux" +
+      " tombstone"
+  ) {
+    // The auxiliary table records every incoming delete event regardless of 
whether it
+    // displaces a target row, so future events at or below the same sequence 
are filtered
+    // consistently.
+    createAuxTable()
+    createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(5L))))
+
+    exec.execute(microbatchOf(sourceSchema)(Row(1, "alice", 2L, true)), 
batchId = 0L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "alice", Row(null, 5L)))
+    checkAnswer(resultAuxTable, Row(1, Row(2L, null)))
+  }
+
+  test(
+    "Scd1ForeachBatchHandler resolves a within-batch delete and 
higher-sequenced upsert as an" +
+      " upsert insert"
+  ) {
+    // Within-batch dedup picks the highest-sequenced event regardless of 
kind. Here an
+    // upsert at seq=3 beats a delete at seq=2, so the row is inserted into 
the target and
+    // no auxiliary tombstone is recorded for the per-key winner.
+    createAuxTable()
+    createTargetTable()
+
+    exec.execute(
+      microbatchOf(sourceSchema)(
+        Row(1, "alice", 2L, true),
+        Row(1, "bob", 3L, false)
+      ),
+      batchId = 0L
+    )
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(resultTargetTable, Row(1, "bob", Row(null, 3L)))
+    assert(resultAuxTable.collect().isEmpty)
+  }
+
+  test(
+    "Scd1ForeachBatchHandler treats a composite key as a single identifier and 
isolates rows by" +
+      " full key"
+  ) {
+    // Composite [country, city] key. Three rows that overlap on country (US, 
US, UK) but
+    // never on the full key must remain three distinct identities in the 
target.
+    val compositeSourceSchema = new StructType()
+      .add("country", StringType)
+      .add("city", StringType)
+      .add("population", LongType)
+      .add("seq", LongType)
+      .add("is_delete", BooleanType)
+    val compositeAuxSchema = new StructType()
+      .add("country", StringType)
+      .add("city", StringType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+    val compositeTargetSchema = new StructType()
+      .add("country", StringType)
+      .add("city", StringType)
+      .add("population", LongType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+    val compositeProcessor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("country"), 
UnqualifiedColumnName("city")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        deleteCondition = Some(F.col("is_delete")),
+        columnSelection = Some(
+          ColumnSelection.ExcludeColumns(
+            Seq(UnqualifiedColumnName("seq"), 
UnqualifiedColumnName("is_delete"))
+          )
+        )
+      ),
+      resolvedSequencingType = LongType
+    )
+    val compositeExec = Scd1ForeachBatchHandler(
+      batchProcessor = compositeProcessor,
+      auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+      targetTableIdentifier = defaultTargetTableIdentifier
+    )
+
+    createTable(defaultAuxIdent, defaultAuxTableIdentifier, compositeAuxSchema)
+    createTable(defaultTargetIdent, defaultTargetTableIdentifier, 
compositeTargetSchema)
+
+    compositeExec.execute(
+      microbatchOf(compositeSourceSchema)(
+        Row("US", "New York", 8000000L, 1L, false),
+        Row("US", "Los Angeles", 4000000L, 1L, false),
+        Row("UK", "London", 9000000L, 1L, false)
+      ),
+      batchId = 0L
+    )
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(
+      resultTargetTable.orderBy("country", "city"),
+      Seq(
+        Row("UK", "London", 9000000L, Row(null, 1L)),
+        Row("US", "Los Angeles", 4000000L, Row(null, 1L)),
+        Row("US", "New York", 8000000L, Row(null, 1L))
+      )
+    )
+    assert(resultAuxTable.collect().isEmpty)
+  }
+
+  test(
+    "Scd1ForeachBatchHandler leaves unrelated target rows untouched when only 
one key is updated"
+  ) {
+    createAuxTable()
+    createTargetTable(
+      Row(1, "alice", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(1L))),
+      Row(2, "bob", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(1L)))
+    )
+
+    exec.execute(microbatchOf(sourceSchema)(Row(1, "alice-updated", 2L, 
false)), batchId = 0L)
+
+    val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+    val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+    checkAnswer(
+      resultTargetTable.orderBy("id"),
+      Seq(
+        Row(1, "alice-updated", Row(null, 2L)),
+        Row(2, "bob", Row(null, 1L))
+      )
+    )
+    assert(resultAuxTable.collect().isEmpty)
+  }
+
+  // 
===========================================================================================
+  // Case-sensitivity tests
+  // 
===========================================================================================
+
+  // A processor that intentionally references columns in UPPERCASE while the 
suite's source,
+  // auxiliary, and target schemas use lowercase. The case-sensitivity tests 
below run the
+  // same execute() with this processor under different SQLConf settings to 
verify the
+  // session's case-sensitivity flag drives every stage of the pipeline.
+  private val mixedCaseProcessor = Scd1BatchProcessor(
+    changeArgs = ChangeArgs(
+      keys = Seq(UnqualifiedColumnName("ID")),
+      sequencing = F.col("SEQ"),
+      storedAsScdType = ScdType.Type1,
+      deleteCondition = Some(F.col("IS_DELETE")),
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("VALUE"))
+        )
+      )
+    ),
+    resolvedSequencingType = LongType
+  )
+
+  private def mixedCaseExec: Scd1ForeachBatchHandler = Scd1ForeachBatchHandler(
+    batchProcessor = mixedCaseProcessor,
+    auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+    targetTableIdentifier = defaultTargetTableIdentifier
+  )
+
+  test(
+    "Scd1ForeachBatchHandler honors case-insensitive analysis from the batch 
dataframe's session"
+  ) {
+    // Every stage of execute (validation, dedup, project target columns, 
tombstone
+    // application, merge to aux, merge to target) must resolve the UPPERCASE 
column refs in
+    // ChangeArgs against the lowercase schema and produce the correct 
target+aux state.
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      createAuxTable()
+      createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None, 
upsertSeq = Some(1L))))
+
+      mixedCaseExec.execute(
+        microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)),
+        batchId = 0L
+      )
+
+      val resultAuxTable = 
spark.read.table(defaultAuxTableIdentifier.quotedString)
+      val resultTargetTable = 
spark.read.table(defaultTargetTableIdentifier.quotedString)
+      checkAnswer(resultTargetTable, Row(1, "bob", Row(null, 2L)))
+      assert(resultAuxTable.collect().isEmpty)
+    }
+  }
+
+  test(
+    "Scd1ForeachBatchHandler honors case-sensitive analysis from the batch 
dataframe's session"
+  ) {
+    // With case-sensitive analysis, the same UPPERCASE ChangeArgs references 
against a
+    // lowercase schema must not be silently normalized. Execute must surface 
an
+    // AnalysisException rather than fall back to case-insensitive matching 
anywhere.
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+      createAuxTable()
+      createTargetTable()
+
+      intercept[AnalysisException] {
+        mixedCaseExec.execute(
+          microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)),
+          batchId = 0L
+        )
+      }
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to