This is an automated email from the ASF dual-hosted git repository.
dbtsai pushed a commit to branch branch-4.2
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-4.2 by this push:
new f070aae1d416 [SPARK-56856][SDP] Implement SCD1 Batch Processor;
Microbatch Deduplication
f070aae1d416 is described below
commit f070aae1d416fb1109908651c3f547dd26050c90
Author: AnishMahto <[email protected]>
AuthorDate: Thu May 21 11:22:01 2026 -0700
[SPARK-56856][SDP] Implement SCD1 Batch Processor; Microbatch Deduplication
Approved AutoCDC SPIP:
https://lists.apache.org/thread/j6sj9wo9odgdpgzlxtvhoy7szs0jplf7
--------
**Preamble:**
The SCD type 1 flow is a foreachBatch streaming query on an input
change-data-feed, and is responsible for reconciling the incoming change data
onto some target table that follows SCD1 replication semantics.
SCD1 flows also maintain an "auxiliary" table to keep track of
early-arriving out-of-order received events state. Each microbatch will need to
reconcile against this auxiliary table as well, and update the auxiliary
table's state appropriately for future microbatches.
**Microbatch Deduplication:**
The first step of microbatch reconciliation for SCD1 is deduplicating the
microbatch such that there is a single row per key.
Since SCD1 is only concerned with maintaining latest state per key from the
change data source, within a microbatch we only care about the row with the
latest sequencing per key - drop all other rows for that same key.
Closes #55969 from AnishMahto/SPARK-56856-SCD1-microbatch-deduplication.
Authored-by: AnishMahto <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
(cherry picked from commit 1869394580e2512b60af2fb582149414f0a791a1)
Signed-off-by: DB Tsai <[email protected]>
---
.../src/main/resources/error/error-conditions.json | 6 +
.../spark/sql/pipelines/autocdc/ChangeArgs.scala | 20 +-
.../sql/pipelines/autocdc/Scd1BatchProcessor.scala | 67 ++++
.../sql/pipelines/autocdc/ChangeArgsSuite.scala | 15 +
.../autocdc/Scd1BatchProcessorSuite.scala | 434 +++++++++++++++++++++
5 files changed, 541 insertions(+), 1 deletion(-)
diff --git a/common/utils/src/main/resources/error/error-conditions.json
b/common/utils/src/main/resources/error/error-conditions.json
index 395c5e5160f5..a63897468707 100644
--- a/common/utils/src/main/resources/error/error-conditions.json
+++ b/common/utils/src/main/resources/error/error-conditions.json
@@ -197,6 +197,12 @@
],
"sqlState" : "42703"
},
+ "AUTOCDC_EMPTY_KEYS" : {
+ "message" : [
+ "AutoCDC requires at least one key column to identify rows, but received
an empty key set."
+ ],
+ "sqlState" : "22023"
+ },
"AUTOCDC_MULTIPART_COLUMN_IDENTIFIER" : {
"message" : [
"Expected a single column identifier; got the multi-part identifier
<columnName> (parts: <nameParts>)."
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
index 5774781b8ab9..c17c89967baa 100644
---
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgs.scala
@@ -156,4 +156,22 @@ case class ChangeArgs(
storedAsScdType: ScdType,
deleteCondition: Option[Column] = None,
columnSelection: Option[ColumnSelection] = None
-)
+) {
+ ChangeArgs.validateNonEmptyKeys(keys)
+}
+
+object ChangeArgs {
+ /**
+ * Validates that [[ChangeArgs.keys]] is non-empty. Both SCD1 and SCD2
semantics require at
+ * least one key column to identify rows; rejecting empty key sets at
construction lets
+ * downstream consumers rely on `keys.nonEmpty` without re-validating.
+ */
+ private def validateNonEmptyKeys(keys: Seq[UnqualifiedColumnName]): Unit = {
+ if (keys.isEmpty) {
+ throw new AnalysisException(
+ errorClass = "AUTOCDC_EMPTY_KEYS",
+ messageParameters = Map.empty
+ )
+ }
+ }
+}
diff --git
a/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
new file mode 100644
index 000000000000..f87a4a1da53d
--- /dev/null
+++
b/sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F}
+import org.apache.spark.sql.catalyst.util.QuotingUtils
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.util.ArrayImplicits._
+
+/**
+ * Per-microbatch processor for SCD Type 1 AutoCDC flows, complying to the
specified [[changeArgs]]
+ * configuration.
+ */
+case class Scd1BatchProcessor(changeArgs: ChangeArgs) {
+ /**
+ * Deduplicate the incoming CDC microbatch by key, keeping the most recent
event per key
+ * as ordered by [[ChangeArgs.sequencing]].
+ *
+ * For SCD1 we only care about the most recent (by sequence value) event per
key. When
+ * multiple events share the same key and the same sequence value, the row
selected is
+ * non-deterministic and undefined.
+ *
+ * @param validatedMicrobatch A microbatch that has already been validated
such that the
+ * sequencing column should not contain null
values, and its data type
+ * should support ordering.
+ *
+ * The schema of the returned dataframe matches the schema of the microbatch
exactly.
+ */
+ def deduplicateMicrobatch(validatedMicrobatch: DataFrame): DataFrame = {
+ // The `max_by` API can only return a single column, so pack/unpack the
entire row into a
+ // temporary column before and after the `max_by` operation.
+ val winningRowCol = Scd1BatchProcessor.winningRowColName
+
+ val allMicrobatchColumns =
+ validatedMicrobatch.columns
+ .map(colName => F.col(QuotingUtils.quoteIdentifier(colName)))
+ .toImmutableArraySeq
+
+ validatedMicrobatch
+ .groupBy(changeArgs.keys.map(k => F.col(k.quoted)): _*)
+ .agg(
+ F.max_by(F.struct(allMicrobatchColumns: _*), changeArgs.sequencing)
+ .as(winningRowCol)
+ )
+ .select(F.col(s"$winningRowCol.*"))
+ }
+}
+
+object Scd1BatchProcessor {
+ // Columns prefixed with `__spark_autocdc_` are reserved for internal SDP
AutoCDC processing.
+ private[autocdc] val winningRowColName = "__spark_autocdc_winning_row"
+}
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
index 816338cb677e..1de2120a8f91 100644
---
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/ChangeArgsSuite.scala
@@ -362,6 +362,21 @@ class ChangeArgsSuite extends SparkFunSuite with
SharedSparkSession {
)
}
+ test("ChangeArgs rejects an empty key list") {
+ checkError(
+ exception = intercept[AnalysisException] {
+ ChangeArgs(
+ keys = Seq.empty,
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ },
+ condition = "AUTOCDC_EMPTY_KEYS",
+ sqlState = "22023",
+ parameters = Map.empty
+ )
+ }
+
test("UnqualifiedColumnName lets a ParseException from the SQL parser
propagate") {
checkError(
exception = intercept[ParseException] {
diff --git
a/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
new file mode 100644
index 000000000000..208c0aa1e4c5
--- /dev/null
+++
b/sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessorSuite.scala
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.{functions => F, AnalysisException, QueryTest, Row}
+import org.apache.spark.sql.classic.DataFrame
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+class Scd1BatchProcessorSuite extends QueryTest with SharedSparkSession {
+
+ /** Build a microbatch [[DataFrame]] from explicit rows and an explicit
schema. */
+ private def microbatchOf(schema: StructType)(rows: Row*): DataFrame =
+ spark.createDataFrame(spark.sparkContext.parallelize(rows), schema)
+
+ /**
+ * Returns the `(name, dataType)` pairs of `schema`'s fields. Used to
compare two schemas for
+ * structural equivalence while deliberately ignoring nullability and
metadata, which can shift
+ * benignly when columns are unpacked from a struct.
+ */
+ private def columnNamesAndDataTypes(schema: StructType): Seq[(String,
DataType)] =
+ schema.fields.map(f => (f.name, f.dataType)).toSeq
+
+ test("deduplicateMicrobatch keeps only the row with the largest sequence
value per key") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "first"),
+ Row(1, 30L, "winner"),
+ Row(1, 20L, "middle")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 30L, "winner")
+ )
+ }
+
+ test("deduplicateMicrobatch is no-op if there's a single event for a key") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "only-row")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 10L, "only-row")
+ )
+ }
+
+ test("deduplicateMicrobatch handles equal sequencing values for the same
key") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "first-tied-row"),
+ Row(1, 10L, "second-tied-row")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ // On equal sequence number events for the same key we provide no
guarantee on which event will
+ // survive, but the contract is _one_ event will survive - assert that
below.
+ val result = processor.deduplicateMicrobatch(batch).collect()
+ assert(result.length == 1)
+ assert(result.head.getInt(0) == 1)
+ assert(result.head.getLong(1) == 10L)
+ assert(Set("first-tied-row",
"second-tied-row").contains(result.head.getString(2)))
+ }
+
+ test("deduplicateMicrobatch ignores rows with null sequencing when a
non-null value exists") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ // In production the expectation is the microbatch will have been
validated to not contain
+ // any null sequence values, but demonstrate that null sequence rows are
de-prioritized in
+ // deduplication.
+ Row(1, null, "null-sequence"),
+ Row(1, 10L, "non-null-sequence")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 10L, "non-null-sequence")
+ )
+ }
+
+ test(
+ "deduplicateMicrobatch returns a null row when all sequencing values for a
key are null"
+ ) {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+ val batch = microbatchOf(schema)(
+ // In production the expectation is the microbatch will have been
validated to not contain
+ // any null sequence values, but demonstrate that a null row will be
returned by
+ // deduplication if all rows contain a null sequence in the microbatch.
+ Row(1, null, "null-sequence")
+ )
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(null, null, null)
+ )
+ }
+
+ test("deduplicateMicrobatch processes multiple keys independently") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "a1"),
+ Row(2, 50L, "b1-winner"),
+ Row(1, 20L, "a2-winner"),
+ Row(2, 40L, "b2-loser"),
+ Row(3, 1L, "c1-only")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Seq(
+ Row(1, 20L, "a2-winner"),
+ Row(2, 50L, "b1-winner"),
+ Row(3, 1L, "c1-only")
+ )
+ )
+ }
+
+ test("deduplicateMicrobatch carries non-key, non-sequence columns from the
winning row") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("name", StringType)
+ .add("amount", IntegerType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "old-name", 100),
+ Row(1, 20L, "winning-name", 200)
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ // All non-key columns must come from the row with the largest sequence
value, never
+ // a mix of values from multiple rows.
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 20L, "winning-name", 200)
+ )
+ }
+
+ test("deduplicateMicrobatch carries nested columns correctly from the
winning row") {
+ val payloadType = new StructType()
+ .add("name", StringType)
+ .add("amount", IntegerType)
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("payload", payloadType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, Row("old", 100)),
+ Row(1, 20L, Row("new", 200))
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 20L, Row("new", 200))
+ )
+ }
+
+ test("deduplicateMicrobatch supports composite (multi-column) keys") {
+ val schema = new StructType()
+ .add("region", StringType)
+ .add("customer_id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row("US", 1, 10L, "us1-old"),
+ Row("US", 1, 20L, "us1-new"),
+ // Same customer_id as above but different region: independent group.
+ Row("EU", 1, 5L, "eu1-only"),
+ // Same region as above but different customer_id: independent group.
+ Row("US", 2, 99L, "us2-only")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("region"),
UnqualifiedColumnName("customer_id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Seq(
+ Row("US", 1, 20L, "us1-new"),
+ Row("EU", 1, 5L, "eu1-only"),
+ Row("US", 2, 99L, "us2-only")
+ )
+ )
+ }
+
+ test("deduplicateMicrobatch supports an arbitrary sequencing expression") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("alt_seq", LongType)
+ .add("value", StringType)
+
+ // The sequencing expression is a function call referencing multiple
columns, not a bare
+ // identifier. Locks in that `max_by(..., changeArgs.sequencing)`
evaluates the full
+ // expression per-row rather than treating `sequencing` as a single column
reference.
+ val batch = microbatchOf(schema)(
+ // greatest(10, 30) = 30 - winner under the expression.
+ Row(1, 10L, 30L, "winner"),
+ // greatest(25, 20) = 25 - would win under `seq` alone, but loses under
`greatest`.
+ Row(1, 25L, 20L, "would-win-on-seq-alone"),
+ Row(1, 15L, 15L, "always-loses")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.greatest(F.col("seq"), F.col("alt_seq")),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 10L, 30L, "winner")
+ )
+ }
+
+ test("deduplicateMicrobatch supports literal-dot column names") {
+ val schema = new StructType()
+ .add("user.id", IntegerType)
+ .add("seq", LongType)
+ .add("event.value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row(1, 10L, "old"),
+ Row(1, 20L, "new")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("`user.id`")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkAnswer(
+ df = processor.deduplicateMicrobatch(batch),
+ expectedAnswer = Row(1, 20L, "new")
+ )
+ }
+
+ test(
+ "deduplicateMicrobatch fails when a key column collides with the reserved
name"
+ ) {
+ val reservedColName = Scd1BatchProcessor.winningRowColName
+
+ val schema = new StructType()
+ .add(reservedColName, StringType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)(
+ Row("k1", 10L, "loser"),
+ Row("k1", 20L, "winner")
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName(reservedColName)),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ checkError(
+ exception = intercept[AnalysisException] {
+ processor.deduplicateMicrobatch(batch).collect()
+ },
+ condition = "AMBIGUOUS_REFERENCE",
+ sqlState = "42704",
+ parameters = Map(
+ "name" -> s"`$reservedColName`",
+ "referenceNames" -> s"[`$reservedColName`, `$reservedColName`]"
+ ),
+ context = ExpectedContext(fragment = "col", callSitePattern = "")
+ )
+ }
+
+ test("deduplicateMicrobatch preserves the input column names, types, and
ordering") {
+ val schema = new StructType()
+ .add("a", StringType)
+ .add("id", IntegerType)
+ .add("z", DoubleType)
+ .add("seq", LongType)
+ .add("flag", BooleanType)
+
+ val batch = microbatchOf(schema)(
+ Row("a1", 1, 1.5, 10L, true),
+ Row("a2", 1, 2.5, 20L, false)
+ )
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ // Field names and dataTypes must match the input exactly, in the original
order.
+ assert(
+ columnNamesAndDataTypes(processor.deduplicateMicrobatch(batch).schema) ==
+ columnNamesAndDataTypes(schema))
+ }
+
+ test("deduplicateMicrobatch returns an empty DataFrame with preserved
schema") {
+ val schema = new StructType()
+ .add("id", IntegerType)
+ .add("seq", LongType)
+ .add("value", StringType)
+
+ val batch = microbatchOf(schema)()
+
+ val processor = Scd1BatchProcessor(
+ changeArgs = ChangeArgs(
+ keys = Seq(UnqualifiedColumnName("id")),
+ sequencing = F.col("seq"),
+ storedAsScdType = ScdType.Type1
+ )
+ )
+
+ val result = processor.deduplicateMicrobatch(batch)
+ assert(result.collect().isEmpty)
+ assert(columnNamesAndDataTypes(result.schema) ==
columnNamesAndDataTypes(schema))
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]