This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.x by this push:
new bf766b80bbf9 [SPARK-56882][SDP] Implement SCD1 Batch Processor; Target
Column Projection
bf766b80bbf9 is described below
commit bf766b80bbf9acbcfd8ff2ffdad103a703da1b4a
Author: AnishMahto <[email protected]>
AuthorDate: Fri May 22 13:39:14 2026 -0700
[SPARK-56882][SDP] Implement SCD1 Batch Processor; Target Column Projection
Approved AutoCDC SPIP:
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
--------
**Preamble:**
The SCD type 1 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD1 replication semantics.
SCD1 flows also maintain an "auxiliary" table to keep track of
early-arriving out-of-order received events state. Each microbatch will need to
reconcile against this auxiliary table as well, and update the auxiliary
table's state appropriately for future microbatches.
**Target Column Projection:**
As per the SPIP and `ChangeArgs.columnSelection`, users are allowed to
specify the set of columns that actually gets persisted in the target table.
Any columns not selected should be dropped before target table
merge/persistence.
We should project only these selected columns onto the microbatch so that
its dataframe is in the correct shape prior to CDC processing and merging into
the target table.
Closes #55991 from
AnishMahto/SPARK-56882-SCD1-project-target-columns-onto-microbatch.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit af2cfc0b7fb5aee1b04b6e9592bd7e3bff39c66b)
Signed-off-by: DB Tsai <[email protected]>
---
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 52 +++++
.../autocdc/Scd1BatchProcessorSuite.scala | 217 +++++++++++++++++++++
2 files changed, 269 insertions(+)
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index d50c30919ba8..03aaf284f070 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -114,6 +114,58 @@ case class Scd1BatchProcessor(
)
}
+ /**
+ * Project the user-defined column selection onto the microbatch. By this
point the input
+ * microbatch should already have projected its CDC metadata, because it's
possible that the
+ * user-defined column selection drops columns that are otherwise necessary
to compute the
+ * CDC metadata.
+ *
+ * Returned dataframe's schema is: all of the user-selected columns in the
input dataframe as per
+ * [[ChangeArgs.columnSelection]] + the CDC metadata column.
+ */
+ def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf:
DataFrame): DataFrame = {
+ val caseSensitiveColumnComparison =
+
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+ // The user schema is the microbatch schema after dropping the system CDC
metadata column.
+ // We project out the system column before applying user selection and
project it back in
+ // afterwards, so that users cannot control whether this [necessary]
column shows up in the
+ // target table.
+ val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema(
+ schemaName = "microbatch",
+ schema = microbatchWithCdcMetadataDf.schema,
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
+ )
+ ),
+ caseSensitive = caseSensitiveColumnComparison
+ )
+
+ val userSelectedColumnsInMicrobatchSchema =
+ ColumnSelection.applyToSchema(
+ schemaName = "microbatch",
+ schema = userColumnsInMicrobatchSchema,
+ columnSelection = changeArgs.columnSelection,
+ caseSensitive = caseSensitiveColumnComparison
+ )
+
+ // In addition to the explicit user-selected columns, re-project the
operational CDC metadata
+ // column as the last column.
+ val finalColumnsInMicrobatchToSelect =
+ userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => {
+ // Spark drops backticks in the schema, quote all identifiers for
safety before executing
+ // select. Identifiers could have special characters such as '.'.
+ F.col(QuotingUtils.quoteIdentifier(colName))
+ }) :+ F.col(
+ Scd1BatchProcessor.cdcMetadataColName
+ )
+
+ microbatchWithCdcMetadataDf.select(
+ finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _*
+ )
+ }
+
private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit
= {
val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
val resolver = microbatchSqlConf.resolver
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index 1cb348316436..a49c89e35755 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -27,6 +27,20 @@ import org.apache.spark.sql.types._
class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
+ /**
+ * Test Schema for a microbatch that already has the SCD1 CDC metadata
column projected.
+ */
+ private val microbatchWithCdcMetadataSchema: StructType = new StructType()
+ .add("id", IntegerType)
+ .add("name", StringType)
+ .add("age", IntegerType)
+ .add(
+ Scd1BatchProcessor.cdcMetadataColName,
+ new StructType()
+ .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+ .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
+ )
+
/** Build a microbatch [[DataFrame]] from explicit rows and an explicit
schema. */
private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -715,4 +729,207 @@ class Scd1BatchProcessorSuite extends QueryTest with
SharedSparkSession {
)
}
}
+
+ test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC
metadata column " +
+ "when columnSelection is None") {
+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+ Row(1, "alice", 30, Row(null, 10L)),
+ Row(2, "bob", 25, Row(20L, null))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ columnSelection = None
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+ // None selection is no-op on the user columns, and the CDC metadata
column is unconditionally
+ // re-projected last, so the output shape exactly matches the input.
+ assert(result.schema.fieldNames.toSeq ==
microbatchWithCdcMetadataSchema.fieldNames.toSeq)
+ checkAnswer(
+ df = result,
+ expectedAnswer = Seq(
+ Row(1, "alice", 30, Row(null, 10L)),
+ Row(2, "bob", 25, Row(20L, null))
+ )
+ )
+ }
+
+ test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column
even when " +
+ "IncludeColumns does not contain it") {
+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+ Row(1, "alice", 30, Row(null, 10L))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq ==
+ Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, 30, Row(null, 10L))
+ )
+ }
+
+ test("projectTargetColumnsOntoMicrobatch respects exclude column") {
+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+ Row(1, "alice", 30, Row(null, 10L))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ columnSelection = Some(
+ ColumnSelection.ExcludeColumns(
+ Seq(UnqualifiedColumnName("age"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+ assert(
+ result.schema.fieldNames.toSeq ==
+ Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
+ )
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, "alice", Row(null, 10L))
+ )
+ }
+
+ test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema
order") {
+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+ Row(1, "alice", 30, Row(null, 10L))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ // User specifies (age, id) -- intentionally different from the schema
order (id, age).
+ columnSelection = Some(ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
+ ))
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+ // Output column order follows the original microbatch schema (id before
age), not the order
+ // in which the user listed columns in IncludeColumns. The CDC metadata
column is appended
+ // last as always.
+ assert(result.schema.fieldNames.toSeq ==
+ Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, 30, Row(null, 10L))
+ )
+ }
+
+ test("projectTargetColumnsOntoMicrobatch handles backticked column names
containing a " +
+ "literal dot") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ // Even if a column is created with backticks via DDL, those backticks
are consumed by Spark
+ // before resolving the schema; they won't show up in the schema field.
+ .add("user.id", StringType)
+ .add(
+ Scd1BatchProcessor.cdcMetadataColName,
+ new StructType()
+ .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+ .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))
+
+ val batch = microbatchOf(schema)(
+ Row(1, "u-100", Row(null, 10L))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(
+ UnqualifiedColumnName("id"),
+ UnqualifiedColumnName("`user.id`")
+ )
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+ assert(result.schema.fieldNames.toSeq ==
+ Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, "u-100", Row(null, 10L))
+ )
+ }
+
+ test("projectTargetColumnsOntoMicrobatch resolves columnSelection
case-insensitively " +
+ "when SQLConf.CASE_SENSITIVE=false") {
+ withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+ val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+ Row(1, "alice", 30, Row(null, 10L))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1,
+ // User columns intentionally use a different case than the schema
(id, age).
+ columnSelection = Some(
+ ColumnSelection.IncludeColumns(
+ Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
+ )
+ )
+ ),
+ resolvedSequencingType = LongType
+ )
+
+ val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+ // Output column names follow the microbatch schema's casing, not the
casing in the user's
+ // columnSelection. The CDC metadata column is appended last as always.
+ assert(result.schema.fieldNames.toSeq ==
+ Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+ checkAnswer(
+ df = result,
+ expectedAnswer = Row(1, 30, Row(null, 10L))
+ )
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]