This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new af2cfc0b7fb5 [SPARK-56882][SDP] Implement SCD1 Batch Processor; Target 
Column Projection
af2cfc0b7fb5 is described below

commit af2cfc0b7fb5aee1b04b6e9592bd7e3bff39c66b
Author: AnishMahto <[email protected]>
AuthorDate: Fri May 22 13:39:14 2026 -0700

    [SPARK-56882][SDP] Implement SCD1 Batch Processor; Target Column Projection
    
    Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    --------
    
    **Preamble:**
    
    The SCD type 1 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD1 replication semantics.
    
    SCD1 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
    
    **Target Column Projection:**
    
    As per the SPIP and `ChangeArgs.columnSelection`, users are allowed to 
specify the set of columns that actually gets persisted in the target table. 
Any columns not selected should be dropped before target table 
merge/persistence.
    
    We should project only these selected columns onto the microbatch so that 
its dataframe is in the correct shape prior to CDC processing and merging into 
the target table.
    
    Closes #55991 from 
AnishMahto/SPARK-56882-SCD1-project-target-columns-onto-microbatch.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
---
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala |  52 +++++
 .../autocdc/Scd1BatchProcessorSuite.scala          | 217 +++++++++++++++++++++
 2 files changed, 269 insertions(+)

diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
index d50c30919ba8..03aaf284f070 100644
--- 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -114,6 +114,58 @@ case class Scd1BatchProcessor(
     )
   }
 
+  /**
+   * Project the user-defined column selection onto the microbatch. By this 
point the input
+   * microbatch should already have projected its CDC metadata, because it's 
possible that the
+   * user-defined column selection drops columns that are otherwise necessary 
to compute the
+   * CDC metadata.
+   *
+   * Returned dataframe's schema is: all of the user-selected columns in the 
input dataframe as per
+   * [[ChangeArgs.columnSelection]] + the CDC metadata column.
+   */
+  def projectTargetColumnsOntoMicrobatch(microbatchWithCdcMetadataDf: 
DataFrame): DataFrame = {
+    val caseSensitiveColumnComparison =
+      
microbatchWithCdcMetadataDf.sparkSession.sessionState.conf.caseSensitiveAnalysis
+
+    // The user schema is the microbatch schema after dropping the system CDC 
metadata column.
+    // We project out the system column before applying user selection and 
project it back in
+    // afterwards, so that users cannot control whether this [necessary] 
column shows up in the
+    // target table.
+    val userColumnsInMicrobatchSchema = ColumnSelection.applyToSchema(
+      schemaName = "microbatch",
+      schema = microbatchWithCdcMetadataDf.schema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(
+          Seq(UnqualifiedColumnName(Scd1BatchProcessor.cdcMetadataColName))
+        )
+      ),
+      caseSensitive = caseSensitiveColumnComparison
+    )
+
+    val userSelectedColumnsInMicrobatchSchema =
+      ColumnSelection.applyToSchema(
+        schemaName = "microbatch",
+        schema = userColumnsInMicrobatchSchema,
+        columnSelection = changeArgs.columnSelection,
+        caseSensitive = caseSensitiveColumnComparison
+      )
+
+    // In addition to the explicit user-selected columns, re-project the 
operational CDC metadata
+    // column as the last column.
+    val finalColumnsInMicrobatchToSelect =
+      userSelectedColumnsInMicrobatchSchema.fieldNames.map(colName => {
+        // Spark drops backticks in the schema, quote all identifiers for 
safety before executing
+        // select. Identifiers could have special characters such as '.'.
+        F.col(QuotingUtils.quoteIdentifier(colName))
+      }) :+ F.col(
+        Scd1BatchProcessor.cdcMetadataColName
+      )
+
+    microbatchWithCdcMetadataDf.select(
+      finalColumnsInMicrobatchToSelect.toImmutableArraySeq: _*
+    )
+  }
+
   private def validateCdcMetadataColumnNotPresent(microbatch: DataFrame): Unit 
= {
     val microbatchSqlConf = microbatch.sparkSession.sessionState.conf
     val resolver = microbatchSqlConf.resolver
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
index 1cb348316436..a49c89e35755 100644
--- 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -27,6 +27,20 @@ import org.apache.spark.sql.types._
 
 class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
 
+  /**
+   * Test Schema for a microbatch that already has the SCD1 CDC metadata 
column projected.
+   */
+  private val microbatchWithCdcMetadataSchema: StructType = new StructType()
+    .add("id", IntegerType)
+    .add("name", StringType)
+    .add("age", IntegerType)
+    .add(
+      Scd1BatchProcessor.cdcMetadataColName,
+      new StructType()
+        .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+        .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType)
+    )
+
   /** Build a microbatch [[DataFrame]] from explicit rows and an explicit 
schema. */
   private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
     spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
@@ -715,4 +729,207 @@ class Scd1BatchProcessorSuite extends QueryTest with 
SharedSparkSession {
       )
     }
   }
+
+  test("projectTargetColumnsOntoMicrobatch keeps every user column and the CDC 
metadata column " +
+    "when columnSelection is None") {
+    val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+      Row(1, "alice", 30, Row(null, 10L)),
+      Row(2, "bob", 25, Row(20L, null))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        columnSelection = None
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+    // None selection is no-op on the user columns, and the CDC metadata 
column is unconditionally
+    // re-projected last, so the output shape exactly matches the input.
+    assert(result.schema.fieldNames.toSeq == 
microbatchWithCdcMetadataSchema.fieldNames.toSeq)
+    checkAnswer(
+      df = result,
+      expectedAnswer = Seq(
+        Row(1, "alice", 30, Row(null, 10L)),
+        Row(2, "bob", 25, Row(20L, null))
+      )
+    )
+  }
+
+  test("projectTargetColumnsOntoMicrobatch retains the CDC metadata column 
even when " +
+    "IncludeColumns does not contain it") {
+    val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+      Row(1, "alice", 30, Row(null, 10L))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        columnSelection = Some(
+          ColumnSelection.IncludeColumns(
+            Seq(UnqualifiedColumnName("id"), UnqualifiedColumnName("age"))
+          )
+        )
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq ==
+      Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, 30, Row(null, 10L))
+    )
+  }
+
+  test("projectTargetColumnsOntoMicrobatch respects exclude column") {
+    val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+      Row(1, "alice", 30, Row(null, 10L))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        columnSelection = Some(
+          ColumnSelection.ExcludeColumns(
+            Seq(UnqualifiedColumnName("age"))
+          )
+        )
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+    assert(
+      result.schema.fieldNames.toSeq ==
+        Seq("id", "name", Scd1BatchProcessor.cdcMetadataColName)
+    )
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, "alice", Row(null, 10L))
+    )
+  }
+
+  test("projectTargetColumnsOntoMicrobatch preserves the microbatch schema 
order") {
+    val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+      Row(1, "alice", 30, Row(null, 10L))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        // User specifies (age, id) -- intentionally different from the schema 
order (id, age).
+        columnSelection = Some(ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("id"))
+        ))
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+    // Output column order follows the original microbatch schema (id before 
age), not the order
+    // in which the user listed columns in IncludeColumns. The CDC metadata 
column is appended
+    // last as always.
+    assert(result.schema.fieldNames.toSeq ==
+      Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, 30, Row(null, 10L))
+    )
+  }
+
+  test("projectTargetColumnsOntoMicrobatch handles backticked column names 
containing a " +
+    "literal dot") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      // Even if a column is created with backticks via DDL, those backticks 
are consumed by Spark
+      // before resolving the schema; they won't show up in the schema field.
+      .add("user.id", StringType)
+      .add(
+        Scd1BatchProcessor.cdcMetadataColName,
+        new StructType()
+          .add(Scd1BatchProcessor.cdcDeleteSequenceFieldName, LongType)
+          .add(Scd1BatchProcessor.cdcUpsertSequenceFieldName, LongType))
+
+    val batch = microbatchOf(schema)(
+      Row(1, "u-100", Row(null, 10L))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1,
+        columnSelection = Some(
+          ColumnSelection.IncludeColumns(
+            Seq(
+              UnqualifiedColumnName("id"),
+              UnqualifiedColumnName("`user.id`")
+            )
+          )
+        )
+      ),
+      resolvedSequencingType = LongType
+    )
+
+    val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+    assert(result.schema.fieldNames.toSeq ==
+      Seq("id", "user.id", Scd1BatchProcessor.cdcMetadataColName))
+    checkAnswer(
+      df = result,
+      expectedAnswer = Row(1, "u-100", Row(null, 10L))
+    )
+  }
+
+  test("projectTargetColumnsOntoMicrobatch resolves columnSelection 
case-insensitively " +
+    "when SQLConf.CASE_SENSITIVE=false") {
+    withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+      val batch = microbatchOf(microbatchWithCdcMetadataSchema)(
+        Row(1, "alice", 30, Row(null, 10L))
+      )
+
+      val processor = Scd1BatchProcessor(
+        changeArgs = ChangeArgs(
+          keys = Seq(UnqualifiedColumnName("id")),
+          sequencing = F.col("seq"),
+          storedAsScdType = ScdType.Type1,
+          // User columns intentionally use a different case than the schema 
(id, age).
+          columnSelection = Some(
+            ColumnSelection.IncludeColumns(
+              Seq(UnqualifiedColumnName("ID"), UnqualifiedColumnName("AGE"))
+            )
+          )
+        ),
+        resolvedSequencingType = LongType
+      )
+
+      val result = processor.projectTargetColumnsOntoMicrobatch(batch)
+
+      // Output column names follow the microbatch schema's casing, not the 
casing in the user's
+      // columnSelection. The CDC metadata column is appended last as always.
+      assert(result.schema.fieldNames.toSeq ==
+        Seq("id", "age", Scd1BatchProcessor.cdcMetadataColName))
+      checkAnswer(
+        df = result,
+        expectedAnswer = Row(1, 30, Row(null, 10L))
+      )
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to