This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new cb28819952da [SPARK-56953][SDP] Implement SCD1 Batch Processor;
foreachBatch Callback
cb28819952da is described below
commit cb28819952da6da9b69bcb3e0ab15da33fc0213f
Author: AnishMahto <[email protected]>
AuthorDate: Sun May 24 18:49:55 2026 -0700
[SPARK-56953][SDP] Implement SCD1 Batch Processor; foreachBatch Callback
Approved AutoCDC SPIP:
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
--------
This is a stacked PR. Review incremental diff here:
https://github.com/AnishMahto/spark/compare/SPARK-56927-SCD1-merge-microbatch-onto-target...SPARK-56953-SCD1-foreachbatch-callback
Incremental diff that includes [SPARK-56953] [SPARK-56927][SPARK-56923]:
https://github.com/AnishMahto/spark/compare/SPARK-56249-merge-tombstones-onto-microbatch...SPARK-56953-SCD1-foreachbatch-callback
--------
**Preamble:**
The SCD type 1 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD1 replication semantics.
SCD1 flows also maintain an "auxiliary" table to keep track of
early-arriving out-of-order received events state. Each microbatch will need to
reconcile against this auxiliary table as well, and update the auxiliary
table's state appropriately for future microbatches.
**foreachBatch Callback:**
Implementation of the actual callback that will be passed into the
foreachBatch streaming query for SCD1 flows.
The callback orchestrates microbatch validation and calling all of the
`Scd1BatchProcessor` helpers to execute the complete reconciliation of a
change-data-feed microbatch onto the auxiliary and target tables.
Introduce Scd1ForeachBatchExecSuite to exercise E2E business logic unit
tests. In the future when we hook this up to an actual flow execution, we will
introduce E2E _integration_ tests that can additionally verify schema
evolution, interaction with other dataflow graph entities, etc.
Additionally refactor some test helper functions into a shared
`AutoCdcCatalogExecutionTestBase`.
Closes #56016 from AnishMahto/SPARK-56953-SCD1-foreachbatch-callback.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit 30bdf0b39fa6b28ea149ddd43ecd962b2b3c0cd2)
Signed-off-by: DB Tsai <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 23 +
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 191 +++++-
.../autocdc/Scd1ForeachBatchHandler.scala | 73 +++
.../sql/pipelines/autocdc/ScdBatchValidator.scala | 100 ++++
.../autocdc/AutoCdcCatalogExecutionTestBase.scala | 123 ++++
.../autocdc/Scd1BatchProcessorMergeSuite.scala | 541 +++++++++++++++++
.../autocdc/Scd1ForeachBatchHandlerSuite.scala | 639 +++++++++++++++++++++
7 files changed, 1686 insertions(+), 4 deletions(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index f50c5554b5dc..7632f70698f2 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -203,6 +203,29 @@
],
"sqlState" : "22023"
},
+ "AUTOCDC_MICROBATCH_VALIDATION" : {
+ "message" : [
+ "AutoCDC flow on table <tableName> in batch <batchId> failed microbatch
validation."
+ ],
+ "subClass" : {
+ "NON_ORDERABLE_SEQUENCE" : {
+ "message" : [
+ "The sequencing column has non-orderable type <dataType>. The
sequencing column must be of a type that supports ordering."
+ ]
+ },
+ "NULL_KEY" : {
+ "message" : [
+ "The microbatch contains rows with null values in the following key
column(s): <nullKeyCounts>. All rows must have non-null values for every key
column."
+ ]
+ },
+ "NULL_SEQUENCE" : {
+ "message" : [
+ "The microbatch contains <nullCount> row(s) with a null sequencing
value. All rows must have a non-null sequencing value."
+ ]
+ }
+ },
+ "sqlState" : "22000"
+ },
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier
<columnName> (parts: <nameParts>)."
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index e80d43b11554..32aebc8924c0 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.pipelines.autocdc
import org.apache.spark.SparkException
import org.apache.spark.sql.{functions => F, AnalysisException}
import org.apache.spark.sql.Column
+import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.QuotingUtils
import org.apache.spark.sql.classic.DataFrame
import org.apache.spark.sql.types.{DataType, StructField, StructType}
@@ -37,6 +38,45 @@ case class Scd1BatchProcessor(
changeArgs: ChangeArgs,
resolvedSequencingType: DataType) {
+ /**
+ * Reconcile a CDC microbatch into the canonical form that the auxiliary-
and target-table
+ * merges consume. Composes the per-step transforms in the only order that
produces correct
+ * SCD1 semantics:
+ *
+ * 1. [[deduplicateMicrobatch]]: collapse same-key events to the latest by
sequence.
+ * 2. [[extendMicrobatchRowsWithCdcMetadata]]: project the operational
`_cdc_metadata` column
+ * (must run before column selection, which may drop inputs the
metadata expressions
+ * reference).
+ * 3. [[projectTargetColumnsOntoMicrobatch]]: apply the user-defined
column selection while
+ * preserving the CDC metadata column.
+ * 4. [[applyTombstonesToMicrobatch]]: filter out late-arriving events
superseded by
+ * tombstones already recorded in the auxiliary table.
+ *
+ * The per-step methods are kept package-visible so that focused unit tests
can pin each
+ * transform's behavior independently. This method itself is package-visible
so that
+ * [[Scd1ForeachBatchHandler]] can call it after running
[[ScdBatchValidator.validateMicrobatch]]
+ * - validation is intentionally not folded in here, as it must run before
any of these
+ * transforms touch the data.
+ *
+ * @param batchDf The validated incoming CDC microbatch.
+ * @param auxiliaryTableDf A snapshot of the auxiliary table for tombstone
reconciliation.
+ * Must contain at minimum the key columns +
`_cdc_metadata`.
+ * @return The reconciled microbatch, ready to be merged onto both tables.
+ */
+ private[autocdc] def reconcileMicrobatch(
+ batchDf: DataFrame,
+ auxiliaryTableDf: DataFrame): DataFrame = {
+ val deduplicated = deduplicateMicrobatch(validatedMicrobatch = batchDf)
+ val withCdcMetadata =
extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch = deduplicated)
+ val projected = projectTargetColumnsOntoMicrobatch(
+ microbatchWithCdcMetadataDf = withCdcMetadata
+ )
+ applyTombstonesToMicrobatch(
+ microbatchDf = projected,
+ auxiliaryTableDf = auxiliaryTableDf
+ )
+ }
+
/**
* Deduplicate the incoming CDC microbatch by key, keeping the most recent
event per key
* as ordered by [[ChangeArgs.sequencing]].
@@ -51,7 +91,7 @@ case class Scd1BatchProcessor(
*
* The schema of the returned dataframe matches the schema of the microbatch
exactly.
*/
- def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
+ private[autocdc] def deduplicateMicrobatch(validatedMicrobatch: DataFrame):
DataFrame = {
// The `max_by` API can only return a single column, so pack/unpack the
entire row into a
// temporary column before and after the `max_by` operation.
val winningRowCol = Scd1BatchProcessor.winningRowColName
@@ -88,7 +128,8 @@ case class Scd1BatchProcessor(
* The returned dataframe has all of the columns in the input microbatch +
the CDC metadata
* column.
*/
- def extendMicrobatchRowsWithCdcMetadata(validatedMicrobatch: DataFrame):
DataFrame = {
+ private[autocdc] def extendMicrobatchRowsWithCdcMetadata(
+ validatedMicrobatch: DataFrame): DataFrame = {
// Proactively validate the reserved CDC metadata column does not exist in
the microbatch.
validateCdcMetadataColumnNotPresent(validatedMicrobatch)
@@ -123,7 +164,8 @@ case class Scd1BatchProcessor(
* Returned dataframe's schema is: all of the user-selected columns in the
input dataframe as per
* [[ChangeArgs.columnSelection]] + the CDC metadata column.
*/
- def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf:
DataFrame): DataFrame = {
+ private[autocdc] def projectTargetColumnsOntoMicrobatch(
+ microbatchWithCdcMetadataDf: DataFrame): DataFrame = {
val caseSensitiveColumnComparison =
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
@@ -178,7 +220,7 @@ case class Scd1BatchProcessor(
* The returned filtered dataframe has the same schema as the input
microbatch, but with only
* the rows that remain unaffected by any known tombstones.
*/
- def applyTombstonesToMicrobatch(
+ private[autocdc] def applyTombstonesToMicrobatch(
microbatchDf: DataFrame,
auxiliaryTableDf: DataFrame): DataFrame = {
val aliasedMicrobatchDf = microbatchDf.alias("microbatch")
@@ -212,6 +254,147 @@ case class Scd1BatchProcessor(
)
}
+ /**
+ * Merge the reconciled (deduplicated per key) microbatch onto the auxiliary
table,
+ * advancing or deleting existing tombstones and inserting new tombstones
for previously
+ * untracked keys.
+ *
+ * After the merge, the auxiliary table has the same schema as before, but
with the latest
+ * tombstone data per key.
+ *
+ * @param reconciledMicrobatchDf The deduplicated microbatch.
+ * @param auxiliaryTableIdentifier The identifier of the auxiliary table.
+ */
+ private[autocdc] def mergeMicrobatchOntoAuxiliaryTable(
+ reconciledMicrobatchDf: DataFrame,
+ auxiliaryTableIdentifier: TableIdentifier
+ ): Unit = {
+ val auxIdentQuoted = auxiliaryTableIdentifier.quotedString
+ val meta = Scd1BatchProcessor.cdcMetadataColName
+
+ // Project the reconciled microbatch down to just keys + `_cdc_metadata`;
data columns are
+ // irrelevant for the auxiliary table and should not be persisted.
+ val reducedMicrobatch = reconciledMicrobatchDf
+ .select(changeArgs.keys.map(k => F.col(k.quoted)) :+ F.col(meta): _*)
+ .as("reducedMicrobatch")
+
+ val microbatchCdcMetadata: Column = F.col(s"reducedMicrobatch.`$meta`")
+ val incomingDelete: Column =
Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadata)
+ val incomingUpsert: Column =
Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadata)
+
+ val auxCdcMetadata: Column = F.col(s"$auxIdentQuoted.`$meta`")
+ val auxDelete: Column = Scd1BatchProcessor.deleteSequenceOf(auxCdcMetadata)
+
+ val doKeysMatch = changeArgs.keys
+ .map(k => F.col(s"reducedMicrobatch.${k.quoted}") ===
F.col(s"$auxIdentQuoted.${k.quoted}"))
+ .reduce(_ && _)
+
+ val incomingRowRepresentsDeleteEvent =
+ incomingDelete.isNotNull && (incomingUpsert.isNull || incomingDelete >
incomingUpsert)
+
+ reducedMicrobatch
+ .mergeInto(auxIdentQuoted, doKeysMatch)
+ // Incoming delete is newer than the stored one: advance the high-water
mark.
+ .whenMatched(
+ incomingRowRepresentsDeleteEvent && incomingDelete > auxDelete
+ )
+ .update(Map(s"$auxIdentQuoted.`$meta`" -> microbatchCdcMetadata))
+ // Incoming upsert is newer than the stored delete: the key was
re-inserted after the
+ // delete, so the aux tombstone is stale - remove it to prevent
unbounded growth.
+ .whenMatched(
+ !incomingRowRepresentsDeleteEvent && incomingUpsert >= auxDelete
+ )
+ .delete()
+ // New delete for a key not yet tracked, add it to auxiliary table. Note
that in the
+ // reconciled microbatch, there is at most one event for key, which
represents the latest
+ // known event for the key. If the latest known event is a delete, it
must be a tombstone.
+ .whenNotMatched(incomingRowRepresentsDeleteEvent)
+ .insertAll()
+ .merge()
+ }
+
+ /**
+ * Merge the reconciled (deduplicated, tombstone applied, and column
selection + metadata
+ * column projected) microbatch onto the target table, as per SCD1 semantics.
+ *
+ * Microbatch invariants:
+ * - Exactly one of {upsert, delete} version is non-null, the other is
null.
+ * - There is at most one event per key, representing the latest known
event for the key
+ * across the microbatch and auxiliary table.
+ *
+ * Target table invariants:
+ * - Target table only contains live rows; delete sequence is always null,
upsert sequence
+ * is always non-null.
+ *
+ * @param reconciledMicrobatchDf The reconciled microbatch dataframe.
+ * @param targetTableIdentifier The identifier of the target table.
+ */
+ private[autocdc] def mergeMicrobatchOntoTarget(
+ reconciledMicrobatchDf: DataFrame,
+ targetTableIdentifier: TableIdentifier
+ ): Unit = {
+ val meta = Scd1BatchProcessor.cdcMetadataColName
+
+ val destinationTableStr = targetTableIdentifier.quotedString
+ // (Re-)alias the reconciled microbatch DF for easy reference for the
remainder of the merge.
+ val microbatchDf = reconciledMicrobatchDf.as("microbatch")
+
+ val microbatchCdcMetadataCol = F.col(s"microbatch.`$meta`")
+ val destinationCdcMetadataCol =
+ F.col(s"$destinationTableStr.`$meta`")
+
+ val microbatchDeleteVersionField =
+ Scd1BatchProcessor.deleteSequenceOf(microbatchCdcMetadataCol)
+ val microbatchUpsertVersionField =
+ Scd1BatchProcessor.upsertSequenceOf(microbatchCdcMetadataCol)
+ val destinationUpsertVersionField =
+ Scd1BatchProcessor.upsertSequenceOf(destinationCdcMetadataCol)
+
+ val keysMatch = changeArgs.keys
+ .map(k =>
+ F.col(s"microbatch.${k.quoted}") ===
F.col(s"$destinationTableStr.${k.quoted}")
+ )
+ .reduce(_ && _)
+
+ // Upsert beats existing row if incoming upsert sequence is geq to the
upsert sequence on
+ // the target.
+ val incomingWinsUpsert = microbatchUpsertVersionField.isNotNull &&
+ microbatchUpsertVersionField >= destinationUpsertVersionField
+
+ // Delete beats existing row if delete sequencing is strictly greater than
the upsert
+ // sequence on the target. This is an arbitrary but deliberate choice to
maintain that
+ // upserts get priority over deletes on duplicate sequencing.
+ val incomingWinsDelete = microbatchDeleteVersionField.isNotNull &&
+ microbatchDeleteVersionField > destinationUpsertVersionField
+
+ // When the incoming upsert wins against an existing record, the entire
row (all columns)
+ // will be overwritten, including the CDC metadata column. We only exclude
keys because
+ // most merge implementations require that join columns are not being
mutated, even if
+ // the mutation is a no-op.
+ val resolver = microbatchDf.sparkSession.sessionState.conf.resolver
+ val keyNames = changeArgs.keys.map(_.name)
+ val columnsToUpdateWhenIncomingWinsUpsert: Map[String, Column] =
+ microbatchDf.columns
+ .filterNot(c => keyNames.exists(resolver(_, c)))
+ .map { c =>
+ val quotedCol = QuotingUtils.quoteIdentifier(c)
+ s"$destinationTableStr.$quotedCol" -> F.col(s"microbatch.$quotedCol")
+ }
+ .toMap
+
+ microbatchDf
+ .mergeInto(destinationTableStr, keysMatch)
+ .whenMatched(incomingWinsDelete)
+ .delete()
+ .whenMatched(incomingWinsUpsert)
+ .update(columnsToUpdateWhenIncomingWinsUpsert)
+ // New key: only insert upserts; deletes for absent keys are no-ops for
the target table
+ // merge, and instead would have been inserted as tombstones into the
auxiliary table.
+ .whenNotMatched(microbatchDeleteVersionField.isNull)
+ .insertAll()
+ .merge()
+ }
+
private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit
= {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala
new file mode 100644
index 000000000000..c286f26c8263
--- /dev/null
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+
+/**
+ * Exposes an API to execute one SCD Type 1 AutoCDC microbatch reconciliation
on a
+ * foreachBatch streaming query.
+ */
+case class Scd1ForeachBatchHandler(
+ batchProcessor: Scd1BatchProcessor,
+ auxiliaryTableIdentifier: TableIdentifier,
+ targetTableIdentifier: TableIdentifier) {
+
+ /**
+ * Process a single CDC microbatch and merge it into the auxiliary and
target tables.
+ *
+ * Idempotent under same-`batchId` replay: both merges are gated on sequence
inequalities,
+ * so a partial failure between them is reconciled correctly when
foreachBatch retries the
+ * whole batch.
+ */
+ def execute(batchDf: DataFrame, batchId: Long): Unit = {
+ ScdBatchValidator(
+ destinationIdentifier = targetTableIdentifier,
+ changeArgs = batchProcessor.changeArgs,
+ batchDf = batchDf,
+ batchId = batchId
+ ).validateMicrobatch()
+
+ val reconciledMicrobatch = batchProcessor.reconcileMicrobatch(
+ batchDf = batchDf,
+ // Aux holds at most one row per currently-active tombstone (revived
keys are GC'd
+ // by mergeMicrobatchOntoAuxiliaryTable), so it generally stays small
enough for a broadcast
+ // join. Future optimizations: key-pruned reads, table format-aware
clustering and tombstone
+ // TTL.
+ auxiliaryTableDf = batchDf.sparkSession.read.table(
+ auxiliaryTableIdentifier.quotedString
+ )
+ )
+
+ batchProcessor.mergeMicrobatchOntoAuxiliaryTable(
+ reconciledMicrobatchDf = reconciledMicrobatch,
+ auxiliaryTableIdentifier = auxiliaryTableIdentifier
+ )
+
+ // Failure between these two merges is safe under foreachBatch retry: the
aux merge
+ // only ever mutates a tombstone when this batch's event makes it stale
(strictly newer
+ // delete advances it) or redundant (`>=` upsert revives the key, GC'ing
the tombstone),
+ // so on retry those preconditions no longer hold against the
just-advanced aux state -
+ // the aux merge is a no-op and the target merge replays as if for the
first time.
+ batchProcessor.mergeMicrobatchOntoTarget(
+ reconciledMicrobatchDf = reconciledMicrobatch,
+ targetTableIdentifier = targetTableIdentifier
+ )
+ }
+}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ScdBatchValidator.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ScdBatchValidator.scala
new file mode 100644
index 000000000000..0d2f47d1c4a6
--- /dev/null
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ScdBatchValidator.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Column}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.expressions.RowOrdering
+import org.apache.spark.sql.classic.DataFrame
+
+/**
+ * Per-microbatch input validation shared by SCD merge executors. Throws with
a clear,
+ * user-actionable error if the batch violates the CDC contract.
+ *
+ * @param destinationIdentifier The identifier of the target table, used for
error messages.
+ * @param changeArgs The user-specified AutoCDC parameters.
+ * @param batchDf The incoming microbatch to validate.
+ * @param batchId The structured-streaming batch id, used for error messages.
+ */
+case class ScdBatchValidator(
+ destinationIdentifier: TableIdentifier,
+ changeArgs: ChangeArgs,
+ batchDf: DataFrame,
+ batchId: Long) {
+
+ /**
+ * Validates that the sequencing column is orderable and that no row has a
null sequencing
+ * value or a null value in any key column. The per-row checks are folded
into a single
+ * aggregation so the microbatch is scanned exactly once.
+ */
+ def validateMicrobatch(): Unit = {
+ val seqType = batchDf.select(changeArgs.sequencing).schema.head.dataType
+ if (!RowOrdering.isOrderable(seqType)) {
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_MICROBATCH_VALIDATION.NON_ORDERABLE_SEQUENCE",
+ messageParameters = Map(
+ "tableName" -> destinationIdentifier.quotedString,
+ "batchId" -> batchId.toString,
+ "dataType" -> seqType.catalogString
+ )
+ )
+ }
+
+ val sequencingNullCount: Column =
+ F.count(F.when(changeArgs.sequencing.isNull,
F.lit(1))).as("__autocdc_seq_null_count")
+ val perKeyNullCount: Seq[Column] = changeArgs.keys.map { key =>
+ F.count(F.when(F.col(key.quoted).isNull, F.lit(1)))
+ .as(s"__autocdc_key_null_count_${key.name}")
+ }
+ // The null count aggregations are laid out in the returned dataframe as:
+ // [# rows with null sequence, # rows with null for key1, ..., # rows with
null for keyN].
+ val nullCountsResultDf =
+ batchDf.agg(sequencingNullCount, perKeyNullCount: _*).head()
+
+ val numRowsWithNullSequence = nullCountsResultDf.getLong(0)
+ if (numRowsWithNullSequence > 0) {
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_MICROBATCH_VALIDATION.NULL_SEQUENCE",
+ messageParameters = Map(
+ "tableName" -> destinationIdentifier.quotedString,
+ "batchId" -> batchId.toString,
+ "nullCount" -> numRowsWithNullSequence.toString
+ )
+ )
+ }
+
+ val keysWithNullEntries = changeArgs.keys.zipWithIndex.flatMap { case
(key, idx) =>
+ val rowCountForKey = nullCountsResultDf.getLong(idx + 1)
+ Option.when(rowCountForKey > 0)(key -> rowCountForKey)
+ }
+ if (keysWithNullEntries.nonEmpty) {
+ val nullKeyCounts = keysWithNullEntries
+ .map { case (key, count) => s"${key.quoted}=$count" }
+ .mkString(", ")
+
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_MICROBATCH_VALIDATION.NULL_KEY",
+ messageParameters = Map(
+ "tableName" -> destinationIdentifier.quotedString,
+ "batchId" -> batchId.toString,
+ "nullKeyCounts" -> nullKeyCounts
+ )
+ )
+ }
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
new file mode 100644
index 000000000000..0dc0a9027660
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/AutoCdcCatalogExecutionTestBase.scala
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.connector.catalog.{Identifier, TableInfo}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import
org.apache.spark.sql.connector.catalog.InMemoryRowLevelOperationTableCatalog
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{DataType, LongType, StructType}
+
+/**
+ * Shared test infrastructure for AutoCDC suites that exercise execution paths
performing
+ * v2 `MERGE INTO` operations against an in-memory catalog. Provides:
+ *
+ * - A pre-configured [[InMemoryRowLevelOperationTableCatalog]] registered
before each test
+ * and reset after each test.
+ * - Stable v2 [[Identifier]] and Catalyst [[TableIdentifier]] values for an
auxiliary table
+ * and a target table.
+ * - Schema-agnostic primitives: table creation, microbatch [[DataFrame]]
construction,
+ * and CDC metadata helpers parameterized by sequencing type.
+ *
+ * Suites that mix this in are responsible for defining their own schemas
(auxiliary, target,
+ * source) and processor / exec instances, then writing thin wrappers around
[[createTable]]
+ * to seed those schemas.
+ */
+trait AutoCdcCatalogExecutionTestBase {
+ this: SharedSparkSession with BeforeAndAfter =>
+
+ protected val catalogName: String = "cat"
+ protected val namespace: String = "ns1"
+ protected val auxTableName: String = "aux_table"
+ protected val targetTableName: String = "target_table"
+
+ /** Default DSv2 [[Identifier]] for the auxiliary table. */
+ protected val defaultAuxIdent: Identifier = Identifier.of(Array(namespace),
auxTableName)
+ /** Default DSv2 [[Identifier]] for the target table. */
+ protected val defaultTargetIdent: Identifier =
Identifier.of(Array(namespace), targetTableName)
+
+ /** Default catalyst three-part [[TableIdentifier]] for the auxiliary table.
*/
+ protected val defaultAuxTableIdentifier: TableIdentifier = TableIdentifier(
+ table = auxTableName,
+ database = Some(namespace),
+ catalog = Some(catalogName)
+ )
+ /** Default catalyst three-part [[TableIdentifier]] for the target table. */
+ protected val defaultTargetTableIdentifier: TableIdentifier =
TableIdentifier(
+ table = targetTableName,
+ database = Some(namespace),
+ catalog = Some(catalogName)
+ )
+
+ before {
+ spark.conf.set(
+ s"spark.sql.catalog.$catalogName",
+ classOf[InMemoryRowLevelOperationTableCatalog].getName
+ )
+ }
+
+ after {
+ spark.sessionState.catalogManager.reset()
+ spark.sessionState.conf.unsetConf(s"spark.sql.catalog.$catalogName")
+ }
+
+ /**
+ * Schema of the [[Scd1BatchProcessor.cdcMetadataColName]] struct column for
a given
+ * sequencing column type. Defaults to [[LongType]] because all current SCD1
tests use
+ * `Long` sequencing.
+ */
+ protected def cdcMetadataColSchemaType(sequencingType: DataType = LongType):
StructType =
+ new StructType()
+ .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, sequencingType)
+ .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, sequencingType)
+
+ /**
+ * Build a [[Row]] matching the [[Scd1BatchProcessor.cdcMetadataColName]]
struct's two fields,
+ * in the order produced by [[Scd1BatchProcessor.constructCdcMetadataCol]]:
+ */
+ protected def cdcMetadataRow[T](deleteSeq: Option[T], upsertSeq: Option[T]):
Row =
+ Row(deleteSeq.getOrElse(null), upsertSeq.getOrElse(null))
+
+ /**
+ * Create a table in the test catalog under the given DSv2 [[Identifier]]
using `schema`,
+ * optionally seeding it with `seedRows`. Pass no rows to create an empty
table.
+ */
+ protected def createTable(
+ ident: Identifier,
+ tableIdentifier: TableIdentifier,
+ schema: StructType,
+ seedRows: Row*): Unit = {
+ spark.sessionState.catalogManager
+ .catalog(catalogName)
+ .asTableCatalog
+ .createTable(ident, new TableInfo.Builder().withSchema(schema).build())
+
+ if (seedRows.nonEmpty) {
+ microbatchOf(schema)(seedRows:
_*).writeTo(tableIdentifier.quotedString).append()
+ }
+ }
+
+ /** Build a microbatch [[DataFrame]] from explicit `rows` and an explicit
`schema`. */
+ protected def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
+ spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
new file mode 100644
index 000000000000..475d25f5aa2c
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorMergeSuite.scala
@@ -0,0 +1,541 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.{functions => F, AnalysisException, Row}
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * Tests for [[Scd1BatchProcessor]] methods that perform a `MERGE INTO`
against a registered
+ * v2 table. These tests require a v2 catalog that supports row-level
operations
+ * (set up by [[AutoCdcCatalogExecutionTestBase]]) and run actual writes
through Catalyst's
+ * row-level-operations machinery, so they are kept separate from the
pure-DataFrame-transform
+ * tests in [[Scd1BatchProcessorSuite]].
+ */
+class Scd1BatchProcessorMergeSuite
+ extends QueryTest
+ with SharedSparkSession
+ with BeforeAndAfter
+ with AutoCdcCatalogExecutionTestBase {
+
+ /**
+ * Minimal valid shape for both the auxiliary table and microbatch inputs in
these tests:
+ * a single key column `id` plus the CDC metadata struct. The auxiliary
table genuinely
+ * has only this shape in production, and the merge function reduces its
microbatch input
+ * down to keys + `_cdc_metadata` regardless of incoming data columns -- so
most tests can
+ * use this single schema for both ends.
+ */
+ private val minimalSchema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ /** Minimal target-table shape: one key, one data column, and CDC metadata.
*/
+ private val targetSchema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add("value", StringType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ /**
+ * A processor with a single key column `id`. `sequencing` is irrelevant for
+ * merge functions in this suite: they operate entirely on the
already-computed CDC metadata
+ * column, never on the raw sequencing expression.
+ */
+ private val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ /** Create the auxiliary table using [[minimalSchema]], optionally seeded
with `seedRows`. */
+ private def createAuxTable(seedRows: Row*): Unit =
+ createTable(defaultAuxIdent, defaultAuxTableIdentifier, minimalSchema,
seedRows: _*)
+
+ /** Create the target table using [[targetSchema]], optionally seeded with
`seedRows`. */
+ private def createTargetTable(seedRows: Row*): Unit =
+ createTable(defaultTargetIdent, defaultTargetTableIdentifier,
targetSchema, seedRows: _*)
+
+ /**
+ * Build an auxiliary-table schema with the given key columns followed by
the standard CDC
+ * metadata struct. Used by tests that need a non-trivial key shape
(composite or dotted).
+ */
+ private def customKeyAuxSchema(keyColumns: Seq[(String, DataType)]):
StructType = {
+ val withKeys = keyColumns.foldLeft(new StructType()) { case (s, (name,
dt)) =>
+ s.add(name, dt)
+ }
+ withKeys.add(Scd1BatchProcessor.cdcMetadataColName,
cdcMetadataColSchemaType())
+ }
+
+ /**
+ * Create the auxiliary table at [[defaultAuxIdent]] using `schema` and
optionally seed it
+ * with `seedRows`. Used by tests that need a non-trivial key shape
(composite or dotted).
+ */
+ private def createAuxTableWithSchema(schema: StructType, seedRows: Row*):
Unit =
+ createTable(defaultAuxIdent, defaultAuxTableIdentifier, schema, seedRows:
_*)
+
+ /**
+ * `(name, dataType)` pairs of `schema`'s fields, used to compare two
schemas for structural
+ * equivalence while deliberately ignoring nullability and metadata.
+ */
+ private def columnNamesAndDataTypes(schema: StructType): Seq[(String,
DataType)] =
+ schema.fields.map(f => (f.name, f.dataType)).toSeq
+
+ // =============== mergeMicrobatchOntoAuxiliaryTable tests ===============
+
+ test("mergeMicrobatchOntoAuxiliaryTable replaces an existing tombstone with
a newer " +
+ "microbatch tombstone, dropping any microbatch-only data columns") {
+ createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)))
+
+ // The microbatch carries an extra `value` data column that has no place
in the auxiliary
+ // table. mergeMicrobatchOntoAuxiliaryTable must project it away before
merging, both to
+ // satisfy MergeIntoTable's schema requirements and to keep the auxiliary
table free of
+ // unrelated columns.
+ val microbatchSchema = new StructType()
+ .add("id", IntegerType)
+ .add("value", StringType)
+ .add(
+ Scd1BatchProcessor.cdcMetadataColName,
+ new StructType()
+ .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+ .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
+ )
+ val microbatch = microbatchOf(microbatchSchema)(
+ Row(1, "data-leak", cdcMetadataRow(deleteSeq = Some(20L), upsertSeq =
None))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ // Tombstone advanced to delete=20, with exactly one row per key (no
duplicate tombstones).
+ checkAnswer(resultAuxTable, Row(1, Row(20L, null)))
+ // Schema strictly matches minimalSchema; the `value` column was dropped,
not smuggled in.
+ assert(columnNamesAndDataTypes(resultAuxTable.schema) ==
columnNamesAndDataTypes(minimalSchema))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable deletes an existing tombstone when
superseded by a " +
+ "newer microbatch upsert") {
+ createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)))
+
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ assert(resultAuxTable.collect().isEmpty)
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable inserts a new tombstone for a
previously-untracked " +
+ "key") {
+ createAuxTable()
+
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable leaves rows for unrelated keys
untouched") {
+ createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)))
+
+ // Microbatch event affects a different key entirely; the existing
tombstone for id=1 must
+ // not be touched even though the new tombstone's sequence is much larger.
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(2, cdcMetadataRow(deleteSeq = Some(100L), upsertSeq = None))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Seq(
+ Row(1, Row(10L, null)),
+ Row(2, Row(100L, null))
+ ))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch deletes whose
sequence is older " +
+ "than the existing tombstone") {
+ // This documents that mergeMicrobatchOntoAuxiliaryTable's contract is
stronger than just
+ // relying on applyTombstonesToMicrobatch having filtered out stale events
upstream: even
+ // an unfiltered stale incoming delete must not regress the high-water
mark.
+ createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)))
+
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(5L), upsertSeq = None))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable ignores microbatch upserts whose
sequence is older " +
+ "than the existing tombstone") {
+ createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)))
+
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(5L)))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable applies the tied-sequence asymmetry:
equal deletes " +
+ "are kept, equal upserts delete the tombstone") {
+ // On a delete<->upsert sequencing tie, upsert events are given priority
over deletes;
+ // therefore an incoming upsert with the same sequence as a tombstone
should delete the
+ // tombstone. On a delete<->delete sequencing tie, the effect is a no-op.
This is an
+ // internal SCD1 tie-breaking convention, not a publicly documented
contract.
+ createAuxTable(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None)),
+ Row(2, cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = None))
+ )
+
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None)),
+ Row(2, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ // Row 1's tombstone remains the same, but row 2's tombstone should be
marked as stale and
+ // deleted.
+ checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable upsert event for different key does
not affect " +
+ "tombstone") {
+ createAuxTable(Row(2, cdcMetadataRow(deleteSeq = Some(5L), upsertSeq =
None)))
+
+ val microbatch = microbatchOf(minimalSchema)(
+ // Although the upsert seq is 10, this is for key=1; tombstone for key=2
should be unaffected.
+ Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10L)))
+ )
+
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Row(2, Row(5L, null)))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable is idempotent across a microbatch
that exercises " +
+ "every merge clause") {
+ // The auxiliary table starts with three tombstones; the microbatch then
exercises every
+ // merge clause simultaneously:
+ // - id=1: aux tombstone superseded by a microbatch upsert
+ // - id=2: aux tombstone advanced by a newer microbatch delete
+ // - id=3: untouched by the microbatch
+ // - id=4: new tombstone for an untracked key
+ createAuxTable(
+ Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None)),
+ Row(2, cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = None)),
+ Row(3, cdcMetadataRow(deleteSeq = Some(30L), upsertSeq = None))
+ )
+
+ val microbatch = microbatchOf(minimalSchema)(
+ Row(1, cdcMetadataRow(deleteSeq = None, upsertSeq = Some(15L))),
+ Row(2, cdcMetadataRow(deleteSeq = Some(25L), upsertSeq = None)),
+ Row(4, cdcMetadataRow(deleteSeq = Some(40L), upsertSeq = None))
+ )
+
+ val expectedAfterMerge = Seq(
+ Row(2, Row(25L, null)),
+ Row(3, Row(30L, null)),
+ Row(4, Row(40L, null))
+ )
+
+ // First merge applies all three clauses exactly once.
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+ val auxTableAfterFirstMerge =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(auxTableAfterFirstMerge, expectedAfterMerge)
+
+ // Re-applying the same microbatch is a no-op:
+ // - id=1 is absent from aux; whenNotMatched is gated on delete events
=> skipped.
+ // - id=2 has tied delete (incoming==aux); strict `>` in the update
clause fails.
+ // - id=4 has tied delete (incoming==aux); same reason.
+ processor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+ val auxTableAfterSecondMerge =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ checkAnswer(auxTableAfterSecondMerge, expectedAfterMerge)
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable correctly inserts tombstones for
composite key") {
+ // Composite key: (region, customer_id). The merge join condition is the
AND of every key
+ // column equality, so an aux row sharing only `region` with the
microbatch must NOT be
+ // touched, while the microbatch row must be inserted as a new tombstone.
+ val compositeSchema = customKeyAuxSchema(Seq(
+ "region" -> StringType,
+ "customer_id" -> IntegerType
+ ))
+ createAuxTableWithSchema(
+ compositeSchema,
+ Row("US", 99, cdcMetadataRow(deleteSeq = Some(50L), upsertSeq = None))
+ )
+
+ val compositeKeyProcessor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("region"),
UnqualifiedColumnName("customer_id")),
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val microbatch = microbatchOf(compositeSchema)(
+ Row("US", 1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None))
+ )
+
+ compositeKeyProcessor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ checkAnswer(spark.read.table(defaultAuxTableIdentifier.quotedString), Seq(
+ Row("US", 99, Row(50L, null)),
+ Row("US", 1, Row(10L, null))
+ ))
+ }
+
+ test("mergeMicrobatchOntoAuxiliaryTable correctly merges for
backticked/dotted keys") {
+ // Even though the column is a backticked identifier in user-facing DDL,
Spark drops the
+ // backticks during schema resolution so the field name is the literal
`user.id`. The merge
+ // path must propagate the user's quoted identifier through `k.quoted` so
the join condition
+ // and update target both resolve to the same physical column.
+ val dottedKeySchema = customKeyAuxSchema(Seq("user.id" -> IntegerType))
+ createAuxTableWithSchema(
+ dottedKeySchema,
+ Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq = None))
+ )
+
+ val dottedKeyProcessor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("`user.id`")),
+ sequencing = F.lit(0L),
+ storedAsScdType = ScdType.Type1
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ // We expect the existing tombstone with del seq=10 to be advanced to 20
if the merge matches
+ // dotted keys correctly.
+ val microbatch = microbatchOf(dottedKeySchema)(
+ Row(1, cdcMetadataRow(deleteSeq = Some(20L), upsertSeq = None))
+ )
+
+ dottedKeyProcessor.mergeMicrobatchOntoAuxiliaryTable(microbatch,
defaultAuxTableIdentifier)
+
+ checkAnswer(spark.read.table(defaultAuxTableIdentifier.quotedString),
Row(1, Row(20L, null)))
+ }
+
+ // =============== mergeMicrobatchOntoTarget tests ===============
+
+ test("mergeMicrobatchOntoTarget updates an existing row with a newer
upsert") {
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+ )
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "new", Row(null, 20L)))
+ assert(columnNamesAndDataTypes(resultTargetTable.schema) ==
+ columnNamesAndDataTypes(targetSchema))
+ }
+
+ test("mergeMicrobatchOntoTarget deletes an existing row with a newer
delete") {
+ createTargetTable(
+ Row(1, "delete-me", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(10L))),
+ Row(2, "keep-me", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(20L)))
+ )
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(1, "unused", cdcMetadataRow(deleteSeq = Some(15L), upsertSeq = None))
+ )
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(2, "keep-me", Row(null, 20L)))
+ }
+
+ test("mergeMicrobatchOntoTarget inserts new upserts but not new (tombstone)
deletes") {
+ createTargetTable(Row(1, "existing", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(10L))))
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(2, "insert-me", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(20L))),
+ Row(3, "do-not-insert", cdcMetadataRow(deleteSeq = Some(30L), upsertSeq
= None))
+ )
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Seq(
+ Row(1, "existing", Row(null, 10L)),
+ Row(2, "insert-me", Row(null, 20L))
+ ))
+ }
+
+ test("mergeMicrobatchOntoTarget ignores stale upserts and stale deletes") {
+ createTargetTable(
+ Row(1, "target-delete-tie", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(10L))),
+ Row(2, "target-newer", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(20L)))
+ )
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(1, "delete-tie", cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)),
+ Row(2, "older-upsert", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(15L)))
+ )
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Seq(
+ Row(1, "target-delete-tie", Row(null, 10L)),
+ Row(2, "target-newer", Row(null, 20L))
+ ))
+ }
+
+ test("mergeMicrobatchOntoTarget gives tied upserts priority over the target
row") {
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(1, "same-sequence-upsert", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(10L)))
+ )
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "same-sequence-upsert", Row(null,
10L)))
+ }
+
+ test("mergeMicrobatchOntoTarget correctly matches escaped key column names")
{
+ // The raw key name contains special characters that would require being
escaped on name
+ // resolution.
+ val rawKeyName = "a`b"
+ val schemaWithSpecialKeyCharacters = new StructType()
+ // The schema always stores the backtick consumed column name, so
unticked the raw name here.
+ .add(rawKeyName, IntegerType)
+ .add("value", StringType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ createTable(
+ defaultTargetIdent,
+ defaultTargetTableIdentifier,
+ schemaWithSpecialKeyCharacters,
+ Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(10L)))
+ )
+
+ val processorForCustomKeySchema = processor.copy(
+ changeArgs = processor.changeArgs.copy(
+ keys =
Seq(UnqualifiedColumnName(QuotingUtils.quoteIdentifier(rawKeyName)))
+ )
+ )
+ val microbatch = microbatchOf(schemaWithSpecialKeyCharacters)(
+ Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+ )
+
+ processorForCustomKeySchema.mergeMicrobatchOntoTarget(
+ microbatch,
+ defaultTargetTableIdentifier
+ )
+
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "new", Row(null, 20L)))
+ }
+
+ gridTest(
+ "mergeMicrobatchOntoTarget key column comparison respects spark session
case sensitivity"
+ )(Seq(false, true)) { caseSensitive =>
+ withSQLConf("spark.sql.caseSensitive" -> caseSensitive.toString) {
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(10L))))
+
+ val processorWithUpperCaseKey = processor.copy(
+ changeArgs = processor.changeArgs.copy(
+ keys = Seq(UnqualifiedColumnName("ID"))
+ )
+ )
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(1, "new", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(20L)))
+ )
+
+ if (caseSensitive) {
+ val ex = intercept[AnalysisException] {
+ processorWithUpperCaseKey.mergeMicrobatchOntoTarget(
+ microbatch,
+ defaultTargetTableIdentifier
+ )
+ }
+ // Intentionally not using checkError here, to avoid asserting on a
brittle query context
+ // and long message parmeters list.
+ assert(ex.errorClass.contains("UNRESOLVED_COLUMN.WITH_SUGGESTION"))
+ } else {
+ processorWithUpperCaseKey.mergeMicrobatchOntoTarget(
+ microbatch,
+ defaultTargetTableIdentifier
+ )
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "new", Row(null, 20L)))
+ }
+ }
+ }
+
+ test("mergeMicrobatchOntoTarget is idempotent across a microbatch") {
+ createTargetTable(
+ Row(1, "delete-me", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(10L))),
+ Row(2, "update-me", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(20L))),
+ Row(3, "untouched", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(30L)))
+ )
+
+ val microbatch = microbatchOf(targetSchema)(
+ Row(1, "delete-event", cdcMetadataRow(deleteSeq = Some(15L), upsertSeq =
None)),
+ Row(2, "updated", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(25L))),
+ Row(4, "inserted", cdcMetadataRow(deleteSeq = None, upsertSeq =
Some(40L))),
+ Row(5, "absent-delete", cdcMetadataRow(deleteSeq = Some(50L), upsertSeq
= None))
+ )
+
+ val expectedAfterMerge = Seq(
+ Row(2, "updated", Row(null, 25L)),
+ Row(3, "untouched", Row(null, 30L)),
+ Row(4, "inserted", Row(null, 40L))
+ )
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+ val targetTableAfterFirstMerge =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(targetTableAfterFirstMerge, expectedAfterMerge)
+
+ processor.mergeMicrobatchOntoTarget(microbatch,
defaultTargetTableIdentifier)
+ val targetTableAfterSecondMerge =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(targetTableAfterSecondMerge, expectedAfterMerge)
+ }
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
new file mode 100644
index 000000000000..76790847ede5
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala
@@ -0,0 +1,639 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * E2E unit tests for the Scd1ForeachBatchHandler class.
+ */
+class Scd1ForeachBatchHandlerSuite
+ extends QueryTest
+ with SharedSparkSession
+ with BeforeAndAfter
+ with AutoCdcCatalogExecutionTestBase {
+
+ private val sourceSchema = new StructType()
+ .add("id", IntegerType)
+ .add("value", StringType)
+ .add("seq", LongType)
+ .add("is_delete", BooleanType)
+
+ private val auxiliarySchema = new StructType()
+ .add("id", IntegerType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ private val targetSchema = new StructType()
+ .add("id", IntegerType)
+ .add("value", StringType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ private val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ deleteCondition = Some(F.col("is_delete")),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("value"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ /** Create the auxiliary table using [[auxiliarySchema]], optionally seeded
with `seedRows`. */
+ private def createAuxTable(seedRows: Row*): Unit =
+ createTable(defaultAuxIdent, defaultAuxTableIdentifier, auxiliarySchema,
seedRows: _*)
+
+ /** Create the target table using [[targetSchema]], optionally seeded with
`seedRows`. */
+ private def createTargetTable(seedRows: Row*): Unit =
+ createTable(defaultTargetIdent, defaultTargetTableIdentifier,
targetSchema, seedRows: _*)
+
+ private def exec: Scd1ForeachBatchHandler = Scd1ForeachBatchHandler(
+ batchProcessor = processor,
+ auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+ targetTableIdentifier = defaultTargetTableIdentifier
+ )
+
+ //
===========================================================================================
+ // Microbatch validation tests
+ //
===========================================================================================
+
+ test(
+ "Scd1ForeachBatchHandler invalidates rows with null sequencing before
merging to aux/target " +
+ "tables."
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ val batch = microbatchOf(sourceSchema)(
+ Row(1, "invalid", null, false)
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ exec.execute(batch, batchId = 123L)
+ },
+ condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_SEQUENCE",
+ sqlState = "22000",
+ parameters = Map(
+ "tableName" -> defaultTargetTableIdentifier.quotedString,
+ "batchId" -> "123",
+ "nullCount" -> "1"
+ )
+ )
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(resultAuxTable.collect().isEmpty)
+ checkAnswer(resultTargetTable, Row(1, "old", Row(null, 10L)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler invalidates rows with a null key column before
merging to aux/target"
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ // Two rows have a null id; one row is well-formed. The validator must
surface a count
+ // of two without writing anything to the aux or target table.
+ val batch = microbatchOf(sourceSchema)(
+ Row(null, "no-id-1", 5L, false),
+ Row(2, "ok", 6L, false),
+ Row(null, "no-id-2", 7L, true)
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ exec.execute(batch, batchId = 13L)
+ },
+ condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_KEY",
+ sqlState = "22000",
+ parameters = Map(
+ "tableName" -> defaultTargetTableIdentifier.quotedString,
+ "batchId" -> "13",
+ "nullKeyCounts" -> "`id`=2"
+ )
+ )
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(resultAuxTable.collect().isEmpty)
+ checkAnswer(resultTargetTable, Row(1, "old", Row(null, 10L)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler invalidates rows when any column of a composite
key is null"
+ ) {
+ // Composite [country, city] key. The validator must report per-column
null counts in
+ // the configured key order (country before city).
+ val compositeSourceSchema = new StructType()
+ .add("country", StringType)
+ .add("city", StringType)
+ .add("seq", LongType)
+ .add("is_delete", BooleanType)
+ val compositeAuxSchema = new StructType()
+ .add("country", StringType)
+ .add("city", StringType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ val compositeTargetSchema = new StructType()
+ .add("country", StringType)
+ .add("city", StringType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ val compositeProcessor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("country"),
UnqualifiedColumnName("city")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ deleteCondition = Some(F.col("is_delete"))
+ ),
+ resolvedSequencingType = LongType
+ )
+ val compositeExec = Scd1ForeachBatchHandler(
+ batchProcessor = compositeProcessor,
+ auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+ targetTableIdentifier = defaultTargetTableIdentifier
+ )
+
+ createTable(defaultAuxIdent, defaultAuxTableIdentifier, compositeAuxSchema)
+ createTable(defaultTargetIdent, defaultTargetTableIdentifier,
compositeTargetSchema)
+
+ // country is null in 2 rows, city is null in 2 rows (one row has both
null).
+ val batch = microbatchOf(compositeSourceSchema)(
+ Row(null, "Boston", 1L, false),
+ Row("US", null, 2L, false),
+ Row(null, null, 3L, false)
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ compositeExec.execute(batch, batchId = 7L)
+ },
+ condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_KEY",
+ sqlState = "22000",
+ parameters = Map(
+ "tableName" -> defaultTargetTableIdentifier.quotedString,
+ "batchId" -> "7",
+ "nullKeyCounts" -> "`country`=2, `city`=2"
+ )
+ )
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(resultAuxTable.collect().isEmpty)
+ assert(resultTargetTable.collect().isEmpty)
+ }
+
+ test(
+ "Scd1ForeachBatchHandler surfaces the null-sequence error before the
null-key error"
+ ) {
+ // A single row has both a null sequence and a null id. The validator must
surface the
+ // sequence error first to preserve the existing precedence.
+ createAuxTable()
+ createTargetTable()
+
+ val batch = microbatchOf(sourceSchema)(
+ Row(null, "bad", null, false)
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ exec.execute(batch, batchId = 99L)
+ },
+ condition = "AUTOCDC_MICROBATCH_VALIDATION.NULL_SEQUENCE",
+ sqlState = "22000",
+ parameters = Map(
+ "tableName" -> defaultTargetTableIdentifier.quotedString,
+ "batchId" -> "99",
+ "nullCount" -> "1"
+ )
+ )
+ }
+
+ test(
+ "Scd1ForeachBatchHandler validates that the microbatch's sequencing column
is orderable"
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ val batchSchema = new StructType()
+ .add("id", IntegerType)
+ .add("value", StringType)
+ .add("seq", MapType(StringType, IntegerType))
+ .add("is_delete", BooleanType)
+ val batch = microbatchOf(batchSchema)(
+ Row(1, "invalid", Map("k" -> 1), false)
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ exec.execute(batch, batchId = 124L)
+ },
+ condition = "AUTOCDC_MICROBATCH_VALIDATION.NON_ORDERABLE_SEQUENCE",
+ sqlState = "22000",
+ parameters = Map(
+ "tableName" -> defaultTargetTableIdentifier.quotedString,
+ "batchId" -> "124",
+ "dataType" -> "map<string,int>"
+ )
+ )
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(resultAuxTable.collect().isEmpty)
+ checkAnswer(resultTargetTable, Row(1, "old", Row(null, 10L)))
+ }
+
+ //
===========================================================================================
+ // Core SCD1 transformation tests
+ //
===========================================================================================
+
+ test(
+ "Scd1ForeachBatchHandler drops stale microbatch rows using auxiliary
tombstones and writes " +
+ "fresh upserts"
+ ) {
+ createAuxTable(Row(1, cdcMetadataRow(deleteSeq = Some(10L), upsertSeq =
None)))
+ createTargetTable()
+
+ val batch = microbatchOf(sourceSchema)(
+ Row(1, "stale", 5L, false),
+ Row(2, "fresh", 20L, false)
+ )
+
+ exec.execute(batch, batchId = 0L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Row(1, Row(10L, null)))
+ checkAnswer(resultTargetTable, Row(2, "fresh", Row(null, 20L)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler persists a newer delete as a tombstone and
removes the target row"
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ val batch = microbatchOf(sourceSchema)(
+ Row(1, "unused", 20L, true)
+ )
+
+ exec.execute(batch, batchId = 1L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultAuxTable, Row(1, Row(20L, null)))
+ assert(resultTargetTable.collect().isEmpty)
+ }
+
+ test(
+ "Scd1ForeachBatchHandler deduplicates the raw microbatch before merging
into the target"
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "old", cdcMetadataRow(deleteSeq = None, upsertSeq
= Some(10L))))
+
+ val batch = microbatchOf(sourceSchema)(
+ Row(1, "ignored-older", 15L, false),
+ Row(1, "newer", 20L, false)
+ )
+
+ exec.execute(batch, batchId = 2L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(resultAuxTable.collect().isEmpty)
+ checkAnswer(resultTargetTable, Row(1, "newer", Row(null, 20L)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler reconciles out-of-order events when
ExcludeColumns hides the " +
+ "sequencing column"
+ ) {
+ // ExcludeColumns omits the sequencing column ("seq") and the delete
marker ("is_delete")
+ // from persisted rows. The sequencing expression itself still drives CDC
reconciliation;
+ // this test verifies that several out-of-order events across six batches
converge to the
+ // correct target state without ever materializing those columns.
+ val excludeProcessor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ deleteCondition = Some(F.col("is_delete")),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("seq"),
UnqualifiedColumnName("is_delete"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+ val excludeExec = Scd1ForeachBatchHandler(
+ batchProcessor = excludeProcessor,
+ auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+ targetTableIdentifier = defaultTargetTableIdentifier
+ )
+
+ createAuxTable()
+ createTargetTable()
+
+ // Batch 1: highest-seq event in the batch wins on insert.
+ excludeExec.execute(
+ microbatchOf(sourceSchema)(
+ Row(1, "alice", 1L, false),
+ Row(1, "bob", 3L, false)
+ ),
+ batchId = 0L
+ )
+
+ // Batch 2: out-of-order older upsert (seq=2) must not overwrite the live
row at seq=3.
+ excludeExec.execute(microbatchOf(sourceSchema)(Row(1, "carol", 2L,
false)), batchId = 1L)
+
+ // Batch 3: even-newer upsert wins.
+ excludeExec.execute(microbatchOf(sourceSchema)(Row(1, "dave", 4L, false)),
batchId = 2L)
+
+ // Batch 4: out-of-order older delete (seq=2) must not erase the live row
at seq=4.
+ excludeExec.execute(microbatchOf(sourceSchema)(Row(1, null, 2L, true)),
batchId = 3L)
+
+ val targetTableAfterBatch4 =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(targetTableAfterBatch4, Row(1, "dave", Row(null, 4L)))
+
+ // Batch 5: newer delete (seq=5) wipes the row from the target.
+ excludeExec.execute(microbatchOf(sourceSchema)(Row(1, null, 5L, true)),
batchId = 4L)
+
+ // Batch 6: out-of-order pre-delete upsert (seq=4) is suppressed by the
tombstone.
+ excludeExec.execute(microbatchOf(sourceSchema)(Row(1, "ghost", 4L,
false)), batchId = 5L)
+
+ val auxTableAfterBatch6 =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val targetTableAfterBatch6 =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(targetTableAfterBatch6.collect().isEmpty)
+ checkAnswer(auxTableAfterBatch6, Row(1, Row(5L, null)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler upserts an existing target row when a
higher-sequenced event arrives"
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(1L))))
+
+ exec.execute(microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)), batchId
= 0L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "bob", Row(null, 2L)))
+ assert(resultAuxTable.collect().isEmpty)
+ }
+
+ test(
+ "Scd1ForeachBatchHandler records an aux tombstone for a delete on a
nonexistent key without" +
+ " affecting the target"
+ ) {
+ // A delete event for a key that never existed in the target must still be
recorded in
+ // the auxiliary table, because a strictly older upsert for the same key
arriving in a
+ // later batch must be suppressed by that tombstone.
+ createAuxTable()
+ createTargetTable()
+
+ exec.execute(microbatchOf(sourceSchema)(Row(99, null, 1L, true)), batchId
= 0L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ assert(resultTargetTable.collect().isEmpty)
+ checkAnswer(resultAuxTable, Row(99, Row(1L, null)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler ignores a late-arriving upsert with a sequence
below the target's" +
+ " last upsert"
+ ) {
+ createAuxTable()
+ createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(5L))))
+
+ exec.execute(microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)), batchId
= 0L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "alice", Row(null, 5L)))
+ assert(resultAuxTable.collect().isEmpty)
+ }
+
+ test(
+ "Scd1ForeachBatchHandler ignores a late-arriving lower-seq delete but
still records the aux" +
+ " tombstone"
+ ) {
+ // The auxiliary table records every incoming delete event regardless of
whether it
+ // displaces a target row, so future events at or below the same sequence
are filtered
+ // consistently.
+ createAuxTable()
+ createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(5L))))
+
+ exec.execute(microbatchOf(sourceSchema)(Row(1, "alice", 2L, true)),
batchId = 0L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "alice", Row(null, 5L)))
+ checkAnswer(resultAuxTable, Row(1, Row(2L, null)))
+ }
+
+ test(
+ "Scd1ForeachBatchHandler resolves a within-batch delete and
higher-sequenced upsert as an" +
+ " upsert insert"
+ ) {
+ // Within-batch dedup picks the highest-sequenced event regardless of
kind. Here an
+ // upsert at seq=3 beats a delete at seq=2, so the row is inserted into
the target and
+ // no auxiliary tombstone is recorded for the per-key winner.
+ createAuxTable()
+ createTargetTable()
+
+ exec.execute(
+ microbatchOf(sourceSchema)(
+ Row(1, "alice", 2L, true),
+ Row(1, "bob", 3L, false)
+ ),
+ batchId = 0L
+ )
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "bob", Row(null, 3L)))
+ assert(resultAuxTable.collect().isEmpty)
+ }
+
+ test(
+ "Scd1ForeachBatchHandler treats a composite key as a single identifier and
isolates rows by" +
+ " full key"
+ ) {
+ // Composite [country, city] key. Three rows that overlap on country (US,
US, UK) but
+ // never on the full key must remain three distinct identities in the
target.
+ val compositeSourceSchema = new StructType()
+ .add("country", StringType)
+ .add("city", StringType)
+ .add("population", LongType)
+ .add("seq", LongType)
+ .add("is_delete", BooleanType)
+ val compositeAuxSchema = new StructType()
+ .add("country", StringType)
+ .add("city", StringType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+ val compositeTargetSchema = new StructType()
+ .add("country", StringType)
+ .add("city", StringType)
+ .add("population", LongType)
+ .add(Scd1BatchProcessor.cdcMetadataColName, cdcMetadataColSchemaType())
+
+ val compositeProcessor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("country"),
UnqualifiedColumnName("city")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ deleteCondition = Some(F.col("is_delete")),
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("seq"),
UnqualifiedColumnName("is_delete"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+ val compositeExec = Scd1ForeachBatchHandler(
+ batchProcessor = compositeProcessor,
+ auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+ targetTableIdentifier = defaultTargetTableIdentifier
+ )
+
+ createTable(defaultAuxIdent, defaultAuxTableIdentifier, compositeAuxSchema)
+ createTable(defaultTargetIdent, defaultTargetTableIdentifier,
compositeTargetSchema)
+
+ compositeExec.execute(
+ microbatchOf(compositeSourceSchema)(
+ Row("US", "New York", 8000000L, 1L, false),
+ Row("US", "Los Angeles", 4000000L, 1L, false),
+ Row("UK", "London", 9000000L, 1L, false)
+ ),
+ batchId = 0L
+ )
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(
+ resultTargetTable.orderBy("country", "city"),
+ Seq(
+ Row("UK", "London", 9000000L, Row(null, 1L)),
+ Row("US", "Los Angeles", 4000000L, Row(null, 1L)),
+ Row("US", "New York", 8000000L, Row(null, 1L))
+ )
+ )
+ assert(resultAuxTable.collect().isEmpty)
+ }
+
+ test(
+ "Scd1ForeachBatchHandler leaves unrelated target rows untouched when only
one key is updated"
+ ) {
+ createAuxTable()
+ createTargetTable(
+ Row(1, "alice", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(1L))),
+ Row(2, "bob", cdcMetadataRow(deleteSeq = None, upsertSeq = Some(1L)))
+ )
+
+ exec.execute(microbatchOf(sourceSchema)(Row(1, "alice-updated", 2L,
false)), batchId = 0L)
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(
+ resultTargetTable.orderBy("id"),
+ Seq(
+ Row(1, "alice-updated", Row(null, 2L)),
+ Row(2, "bob", Row(null, 1L))
+ )
+ )
+ assert(resultAuxTable.collect().isEmpty)
+ }
+
+ //
===========================================================================================
+ // Case-sensitivity tests
+ //
===========================================================================================
+
+ // A processor that intentionally references columns in UPPERCASE while the
suite's source,
+ // auxiliary, and target schemas use lowercase. The case-sensitivity tests
below run the
+ // same execute() with this processor under different SQLConf settings to
verify the
+ // session's case-sensitivity flag drives every stage of the pipeline.
+ private val mixedCaseProcessor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("ID")),
+ sequencing = F.col("SEQ"),
+ storedAsScdType = ScdType.Type1,
+ deleteCondition = Some(F.col("IS_DELETE")),
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("VALUE"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ private def mixedCaseExec: Scd1ForeachBatchHandler = Scd1ForeachBatchHandler(
+ batchProcessor = mixedCaseProcessor,
+ auxiliaryTableIdentifier = defaultAuxTableIdentifier,
+ targetTableIdentifier = defaultTargetTableIdentifier
+ )
+
+ test(
+ "Scd1ForeachBatchHandler honors case-insensitive analysis from the batch
dataframe's session"
+ ) {
+ // Every stage of execute (validation, dedup, project target columns,
tombstone
+ // application, merge to aux, merge to target) must resolve the UPPERCASE
column refs in
+ // ChangeArgs against the lowercase schema and produce the correct
target+aux state.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ createAuxTable()
+ createTargetTable(Row(1, "alice", cdcMetadataRow(deleteSeq = None,
upsertSeq = Some(1L))))
+
+ mixedCaseExec.execute(
+ microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)),
+ batchId = 0L
+ )
+
+ val resultAuxTable =
spark.read.table(defaultAuxTableIdentifier.quotedString)
+ val resultTargetTable =
spark.read.table(defaultTargetTableIdentifier.quotedString)
+ checkAnswer(resultTargetTable, Row(1, "bob", Row(null, 2L)))
+ assert(resultAuxTable.collect().isEmpty)
+ }
+ }
+
+ test(
+ "Scd1ForeachBatchHandler honors case-sensitive analysis from the batch
dataframe's session"
+ ) {
+ // With case-sensitive analysis, the same UPPERCASE ChangeArgs references
against a
+ // lowercase schema must not be silently normalized. Execute must surface
an
+ // AnalysisException rather than fall back to case-insensitive matching
anywhere.
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+ createAuxTable()
+ createTargetTable()
+
+ intercept[AnalysisException] {
+ mixedCaseExec.execute(
+ microbatchOf(sourceSchema)(Row(1, "bob", 2L, false)),
+ batchId = 0L
+ )
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]