This is an automated email from the ASF dual-hosted git repository.

dbtsai pushed a commit to branch branch-4.x
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-4.x by this push:
     new 5c81510bf440 [SPARK-56856][SDP] Implement SCD1 Batch Processor; 
Microbatch Deduplication
5c81510bf440 is described below

commit 5c81510bf44014645cf3aa5bfb476e6899d3564d
Author: AnishMahto <[email protected]>
AuthorDate: Thu May 21 11:22:01 2026 -0700

    [SPARK-56856][SDP] Implement SCD1 Batch Processor; Microbatch Deduplication
    
    Approved AutoCDC SPIP: 
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
    
    --------
    
    **Preamble:**
    
    The SCD type 1 flow is a foreachBatch streaming query on an input 
change-data-feed, and is responsible for reconciling the incoming change data 
onto some target table that follows SCD1 replication semantics.
    
    SCD1 flows also maintain an "auxiliary" table to keep track of 
early-arriving out-of-order received events state. Each microbatch will need to 
reconcile against this auxiliary table as well, and update the auxiliary 
table's state appropriately for future microbatches.
    
    **Microbatch Deduplication:**
    
    The first step of microbatch reconciliation for SCD1 is deduplicating the 
microbatch such that there is a single row per key.
    
    Since SCD1 is only concerned with maintaining latest state per key from the 
change data source, within a microbatch we only care about the row with the 
latest sequencing per key - drop all other rows for that same key.
    
    Closes #55969 from AnishMahto/SPARK-56856-SCD1-microbatch-deduplication.
    
    Authored-by: AnishMahto <[email protected]>
    Signed-off-by: DB Tsai <[email protected]>
    (cherry picked from commit 1869394580e2512b60af2fb582149414f0a791a1)
    Signed-off-by: DB Tsai <[email protected]>
---
 .../src/main/resources/error/error-conditions.json |  18 +
 .../spark/sql/pipelines/autocdc/ChangeArgs.scala   | 177 +++++++++
 .../sql/pipelines/autocdc/Scd1BatchProcessor.scala |  67 ++++
 .../sql/pipelines/autocdc/ChangeArgsSuite.scala    | 389 ++++++++++++++++++
 .../autocdc/Scd1BatchProcessorSuite.scala          | 434 +++++++++++++++++++++
 5 files changed, 1085 insertions(+)

diff --git a/common/utils/src/main/resources/error/error-conditions.json 
b/common/utils/src/main/resources/error/error-conditions.json
index abc3b284df31..34e48dec668a 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -191,6 +191,24 @@
     ],
     "sqlState" : "0A000"
   },
+  "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA" : {
+    "message" : [
+      "Using <caseSensitivity> column name comparison, the following columns 
are not present in the <schemaName> schema: <missingColumns>. Available 
columns: <availableColumns>."
+    ],
+    "sqlState" : "42703"
+  },
+  "AUTOCDC_EMPTY_KEYS" : {
+    "message" : [
+      "AutoCDC requires at least one key column to identify rows, but received 
an empty key set."
+    ],
+    "sqlState" : "22023"
+  },
+  "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
+    "message" : [
+      "Expected a single column identifier; got the multi-part identifier 
<columnName> (parts: <nameParts>)."
+    ],
+    "sqlState" : "42703"
+  },
   "AVRO_CANNOT_WRITE_NULL_FIELD" : {
     "message" : [
       "Cannot write null value for field <name> defined as non-null Avro data 
type <dataType>.",
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
new file mode 100644
index 000000000000..c17c89967baa
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{AnalysisException, Column}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.types.StructType
+
+/**
+ * A single, unqualified column identifier (no nested path or table/alias 
qualifier). Backticks
+ * are consumed: "`a.b`" is stored as "a.b" in [[name]]. Use [[name]] for 
direct schema-fieldName
+ * comparison and [[quoted]] for APIs that re-parse identifier strings.
+ */
+case class UnqualifiedColumnName private (name: String) {
+  def quoted: String = QuotingUtils.quoteIdentifier(name)
+}
+
+object UnqualifiedColumnName {
+  def apply(input: String): UnqualifiedColumnName = {
+    val nameParts = CatalystSqlParser.parseMultipartIdentifier(input)
+    if (nameParts.length != 1) {
+      throw multipartColumnIdentifierError(input, nameParts)
+    }
+    new UnqualifiedColumnName(nameParts.head)
+  }
+
+  private def multipartColumnIdentifierError(
+      columnName: String,
+      nameParts: Seq[String]
+  ): AnalysisException =
+    new AnalysisException(
+      errorClass = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
+      messageParameters = Map(
+        "columnName" -> columnName,
+        "nameParts" -> nameParts.mkString(", ")
+      )
+    )
+}
+
+sealed trait ColumnSelection
+object ColumnSelection {
+
+  case class IncludeColumns(columns: Seq[UnqualifiedColumnName]) extends 
ColumnSelection
+  case class ExcludeColumns(columns: Seq[UnqualifiedColumnName])
+      extends ColumnSelection
+
+  /**
+   * Applies [[ColumnSelection]] to a [[StructType]] and returns the filtered 
schema. Field order
+   * follows the original schema; only matching fields are retained in the 
returned schema.
+   *
+   * @param schemaName      Logical name of the schema being filtered, 
surfaced in error messages
+   *                        when columns are not found (e.g. "microbatch", 
"target").
+   * @param schema          The schema to filter.
+   * @param columnSelection The user-provided selection. `None` is a no-op and 
returns `schema`
+   *                        unchanged.
+   * @param caseSensitive   Whether to match column names case-sensitively 
against the schema.
+   *                        Callers should derive this from the session, e.g.
+   *                        `session.sessionState.conf.caseSensitiveAnalysis`, 
so column matching
+   *                        stays consistent with `spark.sql.caseSensitive`.
+   */
+  def applyToSchema(
+      schemaName: String,
+      schema: StructType,
+      columnSelection: Option[ColumnSelection],
+      caseSensitive: Boolean): StructType = columnSelection match {
+    case None =>
+      // A None column selection is interpreted as a no-op.
+      schema
+    case Some(IncludeColumns(cols)) =>
+      val keepIndices = lookupFieldIndices(schemaName, schema, cols, 
caseSensitive)
+      StructType(schema.fields.zipWithIndex.collect {
+        case (field, idx) if keepIndices.contains(idx) => field
+      })
+    case Some(ExcludeColumns(cols)) =>
+      val dropIndices = lookupFieldIndices(schemaName, schema, cols, 
caseSensitive)
+      StructType(schema.fields.zipWithIndex.collect {
+        case (field, idx) if !dropIndices.contains(idx) => field
+      })
+  }
+
+  private def lookupFieldIndices(
+      schemaName: String,
+      schema: StructType,
+      fields: Seq[UnqualifiedColumnName],
+      caseSensitive: Boolean): Set[Int] = {
+    val caseAwareGetFieldIndex: String => Option[Int] =
+      if (caseSensitive) schema.getFieldIndex else 
schema.getFieldIndexCaseInsensitive
+
+    val fieldIndexResolutions = fields.map(f => f -> 
caseAwareGetFieldIndex(f.name))
+    val missingFieldNames = fieldIndexResolutions.collect { case (f, None) => 
f.name }.distinct
+    if (missingFieldNames.nonEmpty) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+        messageParameters = Map(
+          "caseSensitivity" -> CaseSensitivityLabels.of(caseSensitive),
+          "schemaName" -> schemaName,
+          "missingColumns" -> missingFieldNames.mkString(", "),
+          "availableColumns" -> schema.fieldNames.mkString(", ")
+        )
+      )
+    }
+    fieldIndexResolutions.flatMap { case (_, idx) => idx }.toSet
+  }
+}
+
+/** User-facing case-sensitivity labels surfaced in AutoCDC error messages. */
+private[autocdc] object CaseSensitivityLabels {
+  val CaseSensitive: String = "case-sensitive"
+  val CaseInsensitive: String = "case-insensitive"
+
+  def of(caseSensitive: Boolean): String =
+    if (caseSensitive) CaseSensitive else CaseInsensitive
+}
+
+/** The SCD (Slowly Changing Dimension) strategy for a CDC flow. */
+sealed trait ScdType
+
+object ScdType {
+  /** Representation for the standard SCD1 strategy. */
+  case object Type1 extends ScdType
+  /** Representation for the standard SCD2 strategy. */
+  case object Type2 extends ScdType
+}
+
+/**
+ * Configuration for an AutoCDC flow.
+ *
+ * @param keys            The column(s) that uniquely identify a row in the 
source data.
+ * @param sequencing      Expression ordering CDC events to correctly resolve 
out-of-order
+ *                        arrivals. Must be a sortable type.
+ * @param deleteCondition Expression that marks a source row as a DELETE. When 
None, all
+ *                        rows are treated as upserts.
+ * @param storedAsScdType The SCD strategy these args should be applied to.
+ * @param columnSelection Which source columns to select in the target table. 
None means
+ *                        all columns.
+ */
+case class ChangeArgs(
+    keys: Seq[UnqualifiedColumnName],
+    sequencing: Column,
+    storedAsScdType: ScdType,
+    deleteCondition: Option[Column] = None,
+    columnSelection: Option[ColumnSelection] = None
+) {
+  ChangeArgs.validateNonEmptyKeys(keys)
+}
+
+object ChangeArgs {
+  /**
+   * Validates that [[ChangeArgs.keys]] is non-empty. Both SCD1 and SCD2 
semantics require at
+   * least one key column to identify rows; rejecting empty key sets at 
construction lets
+   * downstream consumers rely on `keys.nonEmpty` without re-validating.
+   */
+  private def validateNonEmptyKeys(keys: Seq[UnqualifiedColumnName]): Unit = {
+    if (keys.isEmpty) {
+      throw new AnalysisException(
+        errorClass = "AUTOCDC_EMPTY_KEYS",
+        messageParameters = Map.empty
+      )
+    }
+  }
+}
diff --git 
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
new file mode 100644
index 000000000000..f87a4a1da53d
--- /dev/null
+++ 
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F}
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the 
specified [[changeArgs]]
+ * configuration.
+ */
+case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
+  /**
+   * Deduplicate the incoming CDC microbatch by key, keeping the most recent 
event per key
+   * as ordered by [[ChangeArgs.sequencing]].
+   *
+   * For SCD1 we only care about the most recent (by sequence value) event per 
key. When
+   * multiple events share the same key and the same sequence value, the row 
selected is
+   * non-deterministic and undefined.
+   *
+   * @param validatedMicrobatch A microbatch that has already been validated 
such that the
+   *                            sequencing column should not contain null 
values, and its data type
+   *                            should support ordering.
+   *
+   * The schema of the returned dataframe matches the schema of the microbatch 
exactly.
+   */
+  def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
+    // The `max_by` API can only return a single column, so pack/unpack the 
entire row into a
+    // temporary column before and after the `max_by` operation.
+    val winningRowCol = Scd1BatchProcessor.winningRowColName
+
+    val allMicrobatchColumns =
+      validatedMicrobatch.columns
+        .map(colName => F.col(QuotingUtils.quoteIdentifier(colName)))
+        .toImmutableArraySeq
+
+    validatedMicrobatch
+      .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*)
+      .agg(
+        F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing)
+          .as(winningRowCol)
+      )
+      .select(F.col(s"$winningRowCol.*"))
+  }
+}
+
+object Scd1BatchProcessor {
+  // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP 
AutoCDC processing.
+  private[autocdc] val winningRowColName = "__spark_autocdc_winning_row"
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
new file mode 100644
index 000000000000..1de2120a8f91
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{functions => F, AnalysisException, Row}
+import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, StringType, StructType}
+
+class ChangeArgsSuite extends SparkFunSuite with SharedSparkSession {
+
+  private val sourceSchema = new StructType()
+    .add("id", IntegerType, nullable = false)
+    .add("Name", StringType)
+    .add("age", IntegerType)
+
+  test("ColumnSelection None leaves schema unchanged") {
+    assert(
+      ColumnSelection.applyToSchema(
+        schemaName = "test",
+        schema = sourceSchema,
+        columnSelection = None,
+        caseSensitive = true
+      ) == sourceSchema)
+  }
+
+  test("ColumnSelection IncludeColumns(Seq()) returns an empty schema") {
+    // An explicit empty include-list is semantically distinct from None: it 
means "select
+    // no columns" and produces an empty StructType, not the original schema.
+    assert(
+      ColumnSelection.applyToSchema(
+        schemaName = "test",
+        schema = sourceSchema,
+        columnSelection = Some(ColumnSelection.IncludeColumns(Seq.empty)),
+        caseSensitive = true
+      ) == new StructType())
+  }
+
+  test("ColumnSelection ExcludeColumns(Seq()) leaves schema unchanged") {
+    // An empty exclude-list is a no-op: nothing to remove, so the original 
schema is
+    // returned unchanged (same observable behavior as None for this case).
+    assert(
+      ColumnSelection.applyToSchema(
+        schemaName = "test",
+        schema = sourceSchema,
+        columnSelection = Some(ColumnSelection.ExcludeColumns(Seq.empty)),
+        caseSensitive = true
+      ) == sourceSchema)
+  }
+
+  test("ColumnSelection IncludeColumns filters by exact name in schema order") 
{
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("age"), UnqualifiedColumnName("Name"))
+        )
+      ),
+      caseSensitive = true
+    )
+
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection ExcludeColumns filters by exact name") {
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("id")))
+      ),
+      caseSensitive = true
+    )
+
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection IncludeColumns fails for columns not present in 
schema") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          schemaName = "test",
+          schema = sourceSchema,
+          // Under caseSensitive = true, "name" will not match the schema 
field "Name".
+          columnSelection = Some(
+            ColumnSelection.IncludeColumns(
+              Seq(UnqualifiedColumnName("name"), 
UnqualifiedColumnName("missing"))
+            )
+          ),
+          caseSensitive = true
+        )
+      },
+      condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+      sqlState = "42703",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
+        "schemaName" -> "test",
+        "missingColumns" -> "name, missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("ColumnSelection ExcludeColumns fails for columns not present in 
schema") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          schemaName = "test",
+          schema = sourceSchema,
+          // Under caseSensitive = true, "NAME" will not match the schema 
field "Name".
+          columnSelection = Some(
+            ColumnSelection.ExcludeColumns(
+              Seq(UnqualifiedColumnName("NAME"), 
UnqualifiedColumnName("missing"))
+            )
+          ),
+          caseSensitive = true
+        )
+      },
+      condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+      sqlState = "42703",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseSensitive,
+        "schemaName" -> "test",
+        "missingColumns" -> "NAME, missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("ColumnSelection IncludeColumns matches case-insensitively under 
caseSensitive=false") {
+    // "NAME" and "AGE" do not exactly match the schema fields "Name" and 
"age", but
+    // caseSensitive = false folds both sides to lowercase before comparing.
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("AGE"), UnqualifiedColumnName("NAME"))
+        )
+      ),
+      caseSensitive = false
+    )
+
+    // The retained fields keep their original casing from the schema, not the 
user's input.
+    assert(filteredSchema == new StructType()
+      .add("Name", StringType)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection deduplicates user-provided columns that normalize to 
the same name") {
+    // Under caseSensitive = false, "name" and "NAME" both fold to "name" and 
refer to the same
+    // schema field. The returned schema must include "Name" once, not twice. 
Output ordering
+    // and casing follow the schema, not the user's input.
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(
+          Seq(UnqualifiedColumnName("name"), UnqualifiedColumnName("NAME"))
+        )
+      ),
+      caseSensitive = false
+    )
+
+    assert(filteredSchema == new StructType().add("Name", StringType))
+  }
+
+  test("ColumnSelection ExcludeColumns matches case-insensitively under 
caseSensitive=false") {
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.ExcludeColumns(Seq(UnqualifiedColumnName("name")))
+      ),
+      caseSensitive = false
+    )
+
+    assert(filteredSchema == new StructType()
+      .add("id", IntegerType, nullable = false)
+      .add("age", IntegerType))
+  }
+
+  test("ColumnSelection missing-column error under caseSensitive=false 
preserves user casing") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ColumnSelection.applyToSchema(
+          schemaName = "test",
+          schema = sourceSchema,
+          // "NAME" matches "Name" under caseSensitive=false, but "Missing" 
has no schema match.
+          // The error message reports the user's original casing for the 
missing column and
+          // the schema's original casing for the available columns.
+          columnSelection = Some(
+            ColumnSelection.IncludeColumns(
+              Seq(UnqualifiedColumnName("NAME"), 
UnqualifiedColumnName("Missing"))
+            )
+          ),
+          caseSensitive = false
+        )
+      },
+      condition = "AUTOCDC_COLUMNS_NOT_FOUND_IN_SCHEMA",
+      sqlState = "42703",
+      parameters = Map(
+        "caseSensitivity" -> CaseSensitivityLabels.CaseInsensitive,
+        "schemaName" -> "test",
+        "missingColumns" -> "Missing",
+        "availableColumns" -> "id, Name, age"
+      )
+    )
+  }
+
+  test("UnqualifiedColumnName accepts a simple single-part identifier") {
+    assert(UnqualifiedColumnName("col").name == "col")
+    // .quoted always wraps in back-ticks, even when the input had none.
+    assert(UnqualifiedColumnName("col").quoted == "`col`")
+  }
+
+  test("UnqualifiedColumnName accepts a backtick-quoted name containing a 
literal dot") {
+    // Backticks make the dot part of a single name part, so this passes 
validation. The
+    // stored name is the parsed (unquoted) form so it matches the actual 
schema field name.
+    assert(UnqualifiedColumnName("`a.b`").name == "a.b")
+    // .quoted re-wraps the parsed name in back-ticks, round-tripping back to 
the input form.
+    assert(UnqualifiedColumnName("`a.b`").quoted == "`a.b`")
+  }
+
+  test("UnqualifiedColumnName accepts redundant backticks around a single-part 
name") {
+    // Backticks around an already-single-part identifier are decorative; the 
parser strips them
+    // so the stored name has no surrounding back-ticks.
+    assert(UnqualifiedColumnName("`col`").name == "col")
+    // .quoted re-wraps the parsed name in back-ticks, round-tripping back to 
the input form.
+    assert(UnqualifiedColumnName("`col`").quoted == "`col`")
+  }
+
+  test("UnqualifiedColumnName.quoted is safe to pass to functions.col for 
literal-dot names") {
+    val schema = new StructType()
+      .add("a.b", IntegerType)
+      .add("c", IntegerType)
+
+    val df = spark.createDataFrame(
+      spark.sparkContext.parallelize(Seq(Row(1, 2), Row(3, 4))),
+      schema
+    )
+
+    val key = UnqualifiedColumnName("`a.b`")
+
+    // Sanity-check: the unquoted `name` is not safe to pass to 
`functions.col`. The string is
+    // re-parsed and the literal dot is interpreted as a nested-field path 
separator, so the
+    // analyzer fails to resolve `a`.`b` against the available top-level 
columns.
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select(F.col(key.name)).collect()
+      },
+      condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      sqlState = "42703",
+      parameters = Map(
+        "objectName" -> "`a`.`b`",
+        "proposal" -> "`a.b`, `c`"
+      ),
+      context = ExpectedContext(
+        fragment = "col",
+        callSitePattern = ""
+      )
+    )
+
+    // The `quoted` form wraps the name in back-ticks so the re-parser treats 
the whole thing
+    // as a single identifier, resolving to the top-level "a.b" column.
+    assert(df.select(F.col(key.quoted)).collect().toSeq == Seq(Row(1), Row(3)))
+  }
+
+  test("IncludeColumns correctly matches a backtick-quoted literal-dot 
column") {
+    val schema = new StructType()
+      .add("a.b", IntegerType)
+      .add("c", StringType)
+
+    // The user writes `a.b` to refer to the literal-dot column "a.b" in the 
schema. After
+    // construction, the [[UnqualifiedColumnName]] holds "a.b", which matches 
the field name
+    // exactly and the column is included in the filtered schema.
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = schema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`a.b`")))
+      ),
+      caseSensitive = true
+    )
+
+    assert(filteredSchema == new StructType().add("a.b", IntegerType))
+  }
+
+  test("IncludeColumns correctly matches a backtick-quoted mixed-case column") 
{
+    val filteredSchema = ColumnSelection.applyToSchema(
+      schemaName = "test",
+      schema = sourceSchema,
+      columnSelection = Some(
+        ColumnSelection.IncludeColumns(Seq(UnqualifiedColumnName("`Name`")))
+      ),
+      caseSensitive = true
+    )
+
+    assert(filteredSchema == new StructType().add("Name", StringType))
+  }
+
+  test("UnqualifiedColumnName rejects a dotted (multi-part) identifier") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        UnqualifiedColumnName("a.b")
+      },
+      condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
+      sqlState = "42703",
+      parameters = Map(
+        "columnName" -> "a.b",
+        "nameParts" -> "a, b"
+      )
+    )
+  }
+
+  test("UnqualifiedColumnName rejects a qualified column reference") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        UnqualifiedColumnName("src.x")
+      },
+      condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
+      sqlState = "42703",
+      parameters = Map(
+        "columnName" -> "src.x",
+        "nameParts" -> "src, x"
+      )
+    )
+  }
+
+  test("UnqualifiedColumnName rejects an identifier with three or more parts") 
{
+    checkError(
+      exception = intercept[AnalysisException] {
+        UnqualifiedColumnName("a.b.c")
+      },
+      condition = "AUTOCDC_MULTIPART_COLUMN_IDENTIFIER",
+      sqlState = "42703",
+      parameters = Map(
+        "columnName" -> "a.b.c",
+        "nameParts" -> "a, b, c"
+      )
+    )
+  }
+
+  test("ChangeArgs rejects an empty key list") {
+    checkError(
+      exception = intercept[AnalysisException] {
+        ChangeArgs(
+          keys = Seq.empty,
+          sequencing = F.col("seq"),
+          storedAsScdType = ScdType.Type1
+        )
+      },
+      condition = "AUTOCDC_EMPTY_KEYS",
+      sqlState = "22023",
+      parameters = Map.empty
+    )
+  }
+
+  test("UnqualifiedColumnName lets a ParseException from the SQL parser 
propagate") {
+    checkError(
+      exception = intercept[ParseException] {
+        UnqualifiedColumnName("")
+      },
+      condition = "PARSE_EMPTY_STATEMENT",
+      sqlState = Some("42617")
+    )
+  }
+}
diff --git 
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
new file mode 100644
index 000000000000..208c0aa1e4c5
--- /dev/null
+++ 
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
+
+  /** Build a microbatch [[DataFrame]] from explicit rows and an explicit 
schema. */
+  private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
+    spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+
+  /**
+   * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to 
compare two schemas for
+   * structural equivalence while deliberately ignoring nullability and 
metadata, which can shift
+   * benignly when columns are unpacked from a struct.
+   */
+  private def columnNamesAndDataTypes(schema: StructType): Seq[(String, 
DataType)] =
+    schema.fields.map(f => (f.name, f.dataType)).toSeq
+
+  test("deduplicateMicrobatch keeps only the row with the largest sequence 
value per key") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "first"),
+      Row(1, 30L, "winner"),
+      Row(1, 20L, "middle")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 30L, "winner")
+    )
+  }
+
+  test("deduplicateMicrobatch is no-op if there's a single event for a key") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "only-row")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 10L, "only-row")
+    )
+  }
+
+  test("deduplicateMicrobatch handles equal sequencing values for the same 
key") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "first-tied-row"),
+      Row(1, 10L, "second-tied-row")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    // On equal sequence number events for the same key we provide no 
guarantee on which event will
+    // survive, but the contract is _one_ event will survive - assert that 
below.
+    val result = processor.deduplicateMicrobatch(batch).collect()
+    assert(result.length == 1)
+    assert(result.head.getInt(0) == 1)
+    assert(result.head.getLong(1) == 10L)
+    assert(Set("first-tied-row", 
"second-tied-row").contains(result.head.getString(2)))
+  }
+
+  test("deduplicateMicrobatch ignores rows with null sequencing when a 
non-null value exists") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      // In production the expectation is the microbatch will have been 
validated to not contain
+      // any null sequence values, but demonstrate that null sequence rows are 
de-prioritized in
+      // deduplication.
+      Row(1, null, "null-sequence"),
+      Row(1, 10L, "non-null-sequence")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 10L, "non-null-sequence")
+    )
+  }
+
+  test(
+    "deduplicateMicrobatch returns a null row when all sequencing values for a 
key are null"
+  ) {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+    val batch = microbatchOf(schema)(
+      // In production the expectation is the microbatch will have been 
validated to not contain
+      // any null sequence values, but demonstrate that a null row will be 
returned by
+      // deduplication if all rows contain a null sequence in the microbatch.
+      Row(1, null, "null-sequence")
+    )
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(null, null, null)
+    )
+  }
+
+  test("deduplicateMicrobatch processes multiple keys independently") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "a1"),
+      Row(2, 50L, "b1-winner"),
+      Row(1, 20L, "a2-winner"),
+      Row(2, 40L, "b2-loser"),
+      Row(3, 1L, "c1-only")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Seq(
+        Row(1, 20L, "a2-winner"),
+        Row(2, 50L, "b1-winner"),
+        Row(3, 1L, "c1-only")
+      )
+    )
+  }
+
+  test("deduplicateMicrobatch carries non-key, non-sequence columns from the 
winning row") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("name", StringType)
+      .add("amount", IntegerType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "old-name", 100),
+      Row(1, 20L, "winning-name", 200)
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    // All non-key columns must come from the row with the largest sequence 
value, never
+    // a mix of values from multiple rows.
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 20L, "winning-name", 200)
+    )
+  }
+
+  test("deduplicateMicrobatch carries nested columns correctly from the 
winning row") {
+    val payloadType = new StructType()
+      .add("name", StringType)
+      .add("amount", IntegerType)
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("payload", payloadType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, Row("old", 100)),
+      Row(1, 20L, Row("new", 200))
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 20L, Row("new", 200))
+    )
+  }
+
+  test("deduplicateMicrobatch supports composite (multi-column) keys") {
+    val schema = new StructType()
+      .add("region", StringType)
+      .add("customer_id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row("US", 1, 10L, "us1-old"),
+      Row("US", 1, 20L, "us1-new"),
+      // Same customer_id as above but different region: independent group.
+      Row("EU", 1, 5L, "eu1-only"),
+      // Same region as above but different customer_id: independent group.
+      Row("US", 2, 99L, "us2-only")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("region"), 
UnqualifiedColumnName("customer_id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Seq(
+        Row("US", 1, 20L, "us1-new"),
+        Row("EU", 1, 5L, "eu1-only"),
+        Row("US", 2, 99L, "us2-only")
+      )
+    )
+  }
+
+  test("deduplicateMicrobatch supports an arbitrary sequencing expression") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("alt_seq", LongType)
+      .add("value", StringType)
+
+    // The sequencing expression is a function call referencing multiple 
columns, not a bare
+    // identifier. Locks in that `max_by(..., changeArgs.sequencing)` 
evaluates the full
+    // expression per-row rather than treating `sequencing` as a single column 
reference.
+    val batch = microbatchOf(schema)(
+      // greatest(10, 30) = 30 - winner under the expression.
+      Row(1, 10L, 30L, "winner"),
+      // greatest(25, 20) = 25 - would win under `seq` alone, but loses under 
`greatest`.
+      Row(1, 25L, 20L, "would-win-on-seq-alone"),
+      Row(1, 15L, 15L, "always-loses")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.greatest(F.col("seq"), F.col("alt_seq")),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 10L, 30L, "winner")
+    )
+  }
+
+  test("deduplicateMicrobatch supports literal-dot column names") {
+    val schema = new StructType()
+      .add("user.id", IntegerType)
+      .add("seq", LongType)
+      .add("event.value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row(1, 10L, "old"),
+      Row(1, 20L, "new")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("`user.id`")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkAnswer(
+      df = processor.deduplicateMicrobatch(batch),
+      expectedAnswer = Row(1, 20L, "new")
+    )
+  }
+
+  test(
+    "deduplicateMicrobatch fails when a key column collides with the reserved 
name"
+  ) {
+    val reservedColName = Scd1BatchProcessor.winningRowColName
+
+    val schema = new StructType()
+      .add(reservedColName, StringType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)(
+      Row("k1", 10L, "loser"),
+      Row("k1", 20L, "winner")
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName(reservedColName)),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    checkError(
+      exception = intercept[AnalysisException] {
+        processor.deduplicateMicrobatch(batch).collect()
+      },
+      condition = "AMBIGUOUS_REFERENCE",
+      sqlState = "42704",
+      parameters = Map(
+        "name" -> s"`$reservedColName`",
+        "referenceNames" -> s"[`$reservedColName`, `$reservedColName`]"
+      ),
+      context = ExpectedContext(fragment = "col", callSitePattern = "")
+    )
+  }
+
+  test("deduplicateMicrobatch preserves the input column names, types, and 
ordering") {
+    val schema = new StructType()
+      .add("a", StringType)
+      .add("id", IntegerType)
+      .add("z", DoubleType)
+      .add("seq", LongType)
+      .add("flag", BooleanType)
+
+    val batch = microbatchOf(schema)(
+      Row("a1", 1, 1.5, 10L, true),
+      Row("a2", 1, 2.5, 20L, false)
+    )
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    // Field names and dataTypes must match the input exactly, in the original 
order.
+    assert(
+      columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) ==
+        columnNamesAndDataTypes(schema))
+  }
+
+  test("deduplicateMicrobatch returns an empty DataFrame with preserved 
schema") {
+    val schema = new StructType()
+      .add("id", IntegerType)
+      .add("seq", LongType)
+      .add("value", StringType)
+
+    val batch = microbatchOf(schema)()
+
+    val processor = Scd1BatchProcessor(
+      changeArgs = ChangeArgs(
+        keys = Seq(UnqualifiedColumnName("id")),
+        sequencing = F.col("seq"),
+        storedAsScdType = ScdType.Type1
+      )
+    )
+
+    val result = processor.deduplicateMicrobatch(batch)
+    assert(result.collect().isEmpty)
+    assert(columnNamesAndDataTypes(result.schema) == 
columnNamesAndDataTypes(schema))
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to