szehon-ho commented on code in PR #56016:
URL: https://github.com/apache/spark/pull/56016#discussion_r3293364524


##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandler.scala:
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.classic.DataFrame
+
+/**
+ * Exposes an API to execute one SCD Type 1 AutoCDC microbatch reconciliation 
on a
+ * foreachBatch streaming query.
+ */
+case class Scd1ForeachBatchHandler(
+    batchProcessor: Scd1BatchProcessor,
+    auxiliaryTableIdentifier: TableIdentifier,
+    targetTableIdentifier: TableIdentifier) {
+
+  /**
+   * Process a single CDC microbatch and merge it into the auxiliary and 
target tables.
+   *
+   * The body is intentionally a thin orchestration of three calls - validate, 
reconcile,

Review Comment:
   nit: comment is a bit overly detailed, non blocking



##########
sql/pipelines/src/test/scala/org/apache/spark/sql/pipelines/autocdc/Scd1ForeachBatchHandlerSuite.scala:
##########
@@ -0,0 +1,640 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.pipelines.autocdc
+
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.{functions => F, AnalysisException, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types._
+
+/**
+ * E2E unit tests for the Scd1ForeachBatchHandler class.
+ */
+class Scd1ForeachBatchHandlerSuite

Review Comment:
   nit: better to extend QueryTest



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala:
##########
@@ -212,6 +256,151 @@ case class Scd1BatchProcessor(
     )
   }
 
+  /**
+   * Merge the reconciled (deduplicated per key) microbatch onto the auxiliary 
table,
+   * advancing or deleting existing tombstones and inserting new tombstones 
for previously
+   * untracked keys.
+   *
+   * After the merge, the auxiliary table has the same schema as before, but 
with the latest
+   * tombstone data per key.
+   *
+   * @param reconciledMicrobatchDf The deduplicated microbatch.
+   * @param auxiliaryTableIdentifier The identifier (not a [[DataFrame]]) of 
the auxiliary

Review Comment:
   nit: not a [[DataFrame]] not really necessary.  Non blocking



##########
sql/pipelines/src/main/scala/org/apache/spark/sql/pipelines/autocdc/Scd1BatchProcessor.scala:
##########
@@ -37,6 +38,47 @@ case class Scd1BatchProcessor(
     changeArgs: ChangeArgs,
     resolvedSequencingType: DataType) {
 
+  /**
+   * Reconcile a CDC microbatch into the canonical form that the auxiliary- 
and target-table
+   * merges consume. Composes the per-step transforms in the only order that 
produces correct
+   * SCD1 semantics:
+   *
+   *   1. [[deduplicateMicrobatch]]: collapse same-key events to the latest by 
sequence.
+   *   2. [[extendMicrobatchRowsWithCdcMetadata]]: project the operational 
`_cdc_metadata` column
+   *      (must run before column selection, which may drop inputs the 
metadata expressions
+   *      reference).
+   *   3. [[projectTargetColumnsOntoMicrobatch]]: apply the user-defined 
column selection while
+   *      preserving the CDC metadata column.
+   *   4. [[applyTombstonesToMicrobatch]]: filter out late-arriving events 
superseded by
+   *      tombstones already recorded in the auxiliary table.
+   *
+   * The per-step methods are kept package-visible so that focused unit tests 
can pin each
+   * transform's behavior independently, but production callers should always 
use this entry
+   * point so the ordering is enforced.
+   *
+   * Microbatch validation (orderable sequence, non-null sequence/keys) is a 
separate concern
+   * handled upstream by [[ScdBatchValidator.validateMicrobatch]] and is 
intentionally not
+   * folded into this method - it must run before any of these transforms 
touch the data.
+   *
+   * @param batchDf          The validated incoming CDC microbatch.
+   * @param auxiliaryTableDf A snapshot of the auxiliary table for tombstone 
reconciliation.
+   *                         Must contain at minimum the key columns + 
`_cdc_metadata`.
+   * @return The reconciled microbatch, ready to be merged onto both tables.
+   */
+  def reconcileMicrobatch(

Review Comment:
   nit: can it be private?  we want always validation



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to