This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.2 by this push:
     new 8680857abfe4 [SPARK-56249][SDP] Implement SCD1 Batch Processor; Merge 
Tombstones onto Microbatch
8680857abfe4 is described below

commit 8680857abfe41e0d83ace75ee31e8b428648349f
Author: AnishMahto <[email protected]>
AuthorDate: Fri May 22 16:55:02 2026 -0700

    [SPARK-56249][SDP] Implement SCD1 Batch Processor; Merge Tombstones onto 
Microbatch
    
    Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    --------
    
    **Preamble:**
    
    The SCD type 1 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD1 replication semantics.
    
    SCD1 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
    
    **Merge Tombstones onto Microbatch:**
    
    The auxiliary table produced by an SCD1 flow will [strictly] store 
tombstones accumulated from the flow's change data feed source thus far.
    
    In SCD1, a tombstone is defined as a delete event that has not been 
overtaken by any upsert event so far (an upsert event whose sequence is geq to 
the delete event's sequence).
    
    These events/rows are called tombstones because they represent delete 
events that could still be relevant in closing a late-arriving upsert received 
in future microbatches. But we cannot store this type of row in the target 
table, as it would break the contract of what rows an SCD1 compliant replica 
table contains - hence these tombstones are stored in the auxiliary table.
    
    When a new microbatch is processed, its possible it contains said 
late-arriving upsert events that should be swallowed by some known 
tombstone(s). We need to left anti join the incoming microbatch with the 
auxiliary table on tombstones that do indeed match to the microbatch's 
late-arriving upserts.
    
    Closes #55993 from AnishMahto/SPARK-56249-merge-tombstones-onto-microbatch.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
    (cherry picked from commit 2a78a09069cc3cb43220b0dda72d9d1146d302b0)
    Signed-off-by: DB Tsai <[email protected]>
---
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala |  54 ++++
 .../autocdc/Scd1BatchProcessorSuite.scala          | 351 ++++++++++++++++++++-
 2 files changed, 400 insertions(+), 5 deletions(-)

diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index 03aaf284f070..e80d43b11554 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -166,6 +166,52 @@ case class Scd1BatchProcessor(
     )
   }
 
+  /**
+   * Left anti-join the microbatch with the auxiliary table on tombstones that 
match against and
+   * effectively delete late-arriving upserts (or stale deletes).
+   *
+   * @param microbatchDf The incoming microbatch dataframe with at minimum all 
of the key
+   *                     columns + CDC metadata column.
+   * @param auxiliaryTableDf Dataframe representing the auxiliary table, with 
at minimum the key
+   *                         columns + CDC metadata column.
+   *
+   * The returned filtered dataframe has the same schema as the input 
microbatch, but with only
+   * the rows that remain unaffected by any known tombstones.
+   */
+  def applyTombstonesToMicrobatch(
+      microbatchDf: DataFrame,
+      auxiliaryTableDf: DataFrame): DataFrame = {
+    val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
+    val aliasedAuxiliaryTableDf = auxiliaryTableDf.alias("auxiliaryTable")
+
+    val cdcMetadata = Scd1BatchProcessor.cdcMetadataColName
+
+    val microbatchCdcMetadata = F.col(s"microbatch.$cdcMetadata")
+    val effectiveSeq = F.greatest(
+      Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata),
+      Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)
+    )
+    val tombstoneDeleteSeq =
+      
Scd1BatchProcessor.deleteSequenceOf(F.col(s"auxiliaryTable.$cdcMetadata"))
+
+    val keysMatch = changeArgs.keys
+      .map { k =>
+        F.col(s"microbatch.${k.quoted}") === 
F.col(s"auxiliaryTable.${k.quoted}")
+      }
+      .reduce(_ && _)
+
+    // A microbatch row is considered late-arriving (and therefore deleted by 
the tombstone) when
+    // the auxiliary table holds a tombstone for the same key with a strictly 
larger delete
+    // sequence. Both late-arriving upserts and deletes are dropped.
+    val microbatchRowDeletedByTombstone = effectiveSeq < tombstoneDeleteSeq
+
+    aliasedMicrobatchDf.join(
+      right = aliasedAuxiliaryTableDf,
+      joinExprs = keysMatch && microbatchRowDeletedByTombstone,
+      joinType = "left_anti"
+    )
+  }
+
   private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit 
= {
     val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
     val resolver = microbatchSqlConf.resolver
@@ -194,6 +240,14 @@ object Scd1BatchProcessor {
   private[autocdc] val cdcDeleteSequenceFieldName: String = "deleteSequence"
   private[autocdc] val cdcUpsertSequenceFieldName: String = "upsertSequence"
 
+  /** Project the delete sequence out of the CDC metadata column. */
+  private[autocdc] def deleteSequenceOf(cdcMetadataCol: Column): Column =
+    cdcMetadataCol.getField(cdcDeleteSequenceFieldName)
+
+  /** Project the upsert sequence out of the CDC metadata column. */
+  private[autocdc] def upsertSequenceOf(cdcMetadataCol: Column): Column =
+    cdcMetadataCol.getField(cdcUpsertSequenceFieldName)
+
   /**
    * Schema of the CDC metadata struct column for SCD1.
    */
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index a49c89e35755..c78dc123621b 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -41,6 +41,18 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
         .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
     )
 
+  /** DataType for the CDC metadata column, where sequencing type is Long. */
+  private val cdcMetadataColSchemaType: DataType = new StructType()
+    .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+    .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
+
+  /**
+   * Helper to construct a CDC metadata column row, following 
[[cdcMetadataColSchemaType]].
+   */
+  private def cdcMetadataRow(deleteSeq: Option[Long], upsertSeq: 
Option[Long]): Row =
+    Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null))
+
+
   /** Build a microbatch [[DataFrame]] from explicit rows and an explicit 
schema. */
   private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
     spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -53,6 +65,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
   private def columnNamesAndDataTypes(schema: StructType): Seq[(String, 
DataType)] =
     schema.fields.map(f => (f.name, f.dataType)).toSeq
 
+  // =============== deduplicateMicrobatch tests ===============
+
   test("deduplicateMicrobatch keeps only the row with the largest sequence 
value per key") {
     val schema = new StructType()
       .add("id", IntegerType)
@@ -463,6 +477,8 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
     assert(columnNamesAndDataTypes(result.schema) == 
columnNamesAndDataTypes(schema))
   }
 
+  // =============== extendMicrobatchRowsWithCdcMetadata tests ===============
+
   test("extendMicrobatchRowsWithCdcMetadata classifies each row as a delete or 
an upsert " +
     "per deleteCondition") {
     val schema = new StructType()
@@ -861,11 +877,7 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
       // Even if a column is created with backticks via DDL, those backticks 
are consumed by Spark
       // before resolving the schema; they won't show up in the schema field.
       .add("user.id", StringType)
-      .add(
-        Scd1BatchProcessor.cdcMetadataColName,
-        new StructType()
-          .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
-          .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
 
     val batch = microbatchOf(schema)(
       Row(1, "u-100", Row(null, 10L))
@@ -932,4 +944,333 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
       )
     }
   }
+
+  // =============== applyTombstonesToMicrobatch tests ===============
+
+  /**
+   * Schema for the microbatch input to 
[[Scd1BatchProcessor.applyTombstonesToMicrobatch]]
+   * tests.
+   */
+  private val applyTombstonesToMicrobatchTestMicrobatchSchema: StructType = 
new StructType()
+    // Key column.
+    .add("id", IntegerType)
+    // Data column.
+    .add("value", StringType)
+    // CDC metadata column.
+    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+  /**
+   * Schema for the auxiliary input to 
[[Scd1BatchProcessor.applyTombstonesToMicrobatch]] tests.
+   *
+   * In practice for SCD1 the auxiliary table only carries key columns and the 
CDC metadata
+   * column -- never user data columns -- so we mirror that production-side 
asymmetry here,
+   * even though the function's API contract would allow a single shared 
schema.
+   */
+  private val applyTombstonesToMicrobatchTestAuxiliarySchema: StructType = new 
StructType()
+    // Key column.
+    .add("id", IntegerType)
+    // CDC metadata column.
+    .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+  test("applyTombstonesToMicrobatch drops late-arriving deletes and upserts 
when a matching " +
+    "tombstone exists for the same key") {
+    // Both microbatch events have an effective sequence strictly less than 
the tombstone's
+    // delete sequence, so they must be anti-joined out of the microbatch 
regardless of whether
+    // they are deletes or upserts.
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(5))),
+      Row(1, "stale-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq = 
None))
+    )
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+    assert(result.collect().isEmpty)
+  }
+
+  test("applyTombstonesToMicrobatch keeps a microbatch row whose effective 
sequence ties the " +
+    "tombstone's delete sequence") {
+    // The join uses strict `<`, so a microbatch row with the same effective 
sequence as the
+    // tombstone is kept. This is an internal tie-breaking convention for SCD1 
only, and is
+    // *not* a publicly documented contract: if external callers ever start 
relying on it, both
+    // this test and the join condition in applyTombstonesToMicrobatch should 
move together.
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "tied-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(10)))
+    )
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+      expectedAnswer = Row(1, "tied-upsert", Row(null, 10L))
+    )
+  }
+
+  test("applyTombstonesToMicrobatch keeps microbatch rows whose effective 
sequence exceeds the " +
+    "tombstone's delete sequence") {
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "fresher-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(15))),
+      Row(1, "fresher-delete", cdcMetadataRow(deleteSeq = Some(20), upsertSeq 
= None))
+    )
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+      expectedAnswer = Seq(
+        Row(1, "fresher-upsert", Row(null, 15L)),
+        Row(1, "fresher-delete", Row(20L, null))
+      )
+    )
+  }
+
+  test("applyTombstonesToMicrobatch leaves microbatch rows untouched when the 
tombstone targets " +
+    "a different key") {
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "stays", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
+    )
+    // Tombstone on a different key with a much larger sequence; the key match 
must guard
+    // against cross-key application no matter how stale the microbatch row's 
sequence is.
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(2, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+      expectedAnswer = Row(1, "stays", Row(null, 5L))
+    )
+  }
+
+  test("applyTombstonesToMicrobatch with composite keys requires every key 
column to match") {
+    val schema = new StructType()
+      .add("region", StringType)
+      .add("customer_id", IntegerType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+    val microbatch = microbatchOf(schema)(
+      Row("US", 1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5))),
+      Row("US", 2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
+    )
+    // Tombstone matches on `region` only; `customer_id` differs from every 
microbatch row.
+    // The join condition is the AND of all key column equalities, so neither 
microbatch row
+    // should be dropped.
+    val auxiliary = microbatchOf(schema)(
+      Row("US", 99, cdcMetadataRow(deleteSeq = Some(1000), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("region"), 
UnqualifiedColumnName("customer_id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+      expectedAnswer = Seq(
+        Row("US", 1, Row(null, 5L)),
+        Row("US", 2, Row(null, 5L))
+      )
+    )
+  }
+
+  test("applyTombstonesToMicrobatch supports backticked key names containing a 
literal dot") {
+    val schema = new StructType()
+      .add("user.id", IntegerType)
+      .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType)
+
+    val microbatch = microbatchOf(schema)(
+      Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5)))
+    )
+    val auxiliary = microbatchOf(schema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("`user.id`")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+    assert(result.collect().isEmpty)
+  }
+
+  test("applyTombstonesToMicrobatch is a no-op when the auxiliary table is 
empty") {
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "kept-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(5))),
+      Row(2, "kept-delete", cdcMetadataRow(deleteSeq = Some(7), upsertSeq = 
None))
+    )
+
+    // Empty auxiliary: no rows means the left-anti join cannot match any 
microbatch row, so the
+    // microbatch passes through untouched regardless of its contents.
+
+    // Conceptually, this means there are no tombstones that could potentially 
have delete-matched
+    // against incoming rows in the microbatch.
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)()
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+      expectedAnswer = Seq(
+        Row(1, "kept-upsert", Row(null, 5L)),
+        Row(2, "kept-delete", Row(7L, null))
+      )
+    )
+  }
+
+  test("applyTombstonesToMicrobatch keeps microbatch rows when the matching 
aux row has a " +
+    "null deleteSequence") {
+    // SCD1's tombstone-merge invariant guarantees aux rows always have a 
non-null
+    // deleteSequence, but if a corrupt aux row ever does carry a null 
deleteSequence, the
+    // join's `<` predicate evaluates to null (SQL 3-valued logic) and the 
microbatch row is
+    // retained -- a safe fallback that never silently drops data.
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "kept-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(5)))
+    )
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    checkAnswer(
+      df = processor.applyTombstonesToMicrobatch(microbatch, auxiliary),
+      expectedAnswer = Row(1, "kept-upsert", Row(null, 5L))
+    )
+  }
+
+  test("applyTombstonesToMicrobatch is unaffected by  stale tombstones in 
auxiliary table") {
+    // SCD1's tombstone-merge invariant guarantees at most one tombstone per 
key in the
+    // auxiliary, but if multiple ever coexist for the same key, the left-anti 
semantics drop
+    // the microbatch row whenever *any* matching tombstone has a strictly 
greater
+    // deleteSequence.
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(8)))
+    )
+    // Two tombstones on key=1: one stale (deleteSeq=5, doesn't dominate the 
microbatch row's
+    // effective seq of 8), one fresh (deleteSeq=10, dominates). The fresh one 
alone is enough
+    // to drop the microbatch row.
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(5), upsertSeq = None)),
+      Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = None))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+    assert(result.collect().isEmpty)
+  }
+
+  test("applyTombstonesToMicrobatch ignores the aux row's upsertSequence even 
when it is set") {
+    // SCD1's tombstone-merge invariant guarantees aux rows always have a null 
upsertSequence
+    // (by definition, an aux row is an unswallowed tombstone). But if a 
corrupt aux row ever
+    // has both fields set, only its deleteSequence is read by the join 
condition; the
+    // upsertSequence is never inspected, so the row continues to behave as a 
pure tombstone.
+    val microbatch = 
microbatchOf(applyTombstonesToMicrobatchTestMicrobatchSchema)(
+      Row(1, "stale-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq = 
Some(5)))
+    )
+    // Aux row with both fields populated; only deleteSeq=10 drives the 
tombstone-drop decision.
+    val auxiliary = 
microbatchOf(applyTombstonesToMicrobatchTestAuxiliarySchema)(
+      Row(1, cdcMetadataRow(deleteSeq = Some(10), upsertSeq = Some(20)))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        // Sequencing is irrelevant for applyTombstonesToMicrobatch; it is 
already encoded
+        // into the CDC metadata column.
+        sequencing = F.lit(0L),
+        storedAsScdType = ScdType.Type1
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.applyTombstonesToMicrobatch(microbatch, auxiliary)
+    assert(result.collect().isEmpty)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to