szehon-ho commented on PR #56016:
URL: https://github.com/apache/spark/pull/56016#issuecomment-4523610293
**Suggestion: centralize microbatch pipeline ordering on
`Scd1BatchProcessor`**
`Scd1ForeachBatchExec.execute` wires a fixed sequence of public
`Scd1BatchProcessor` steps. That's clear today, but once this is hooked into a
real foreachBatch callback, any caller could invoke those methods out of order
(e.g. tombstones before CDC metadata, or merges before projection).
Spark-style library code usually hides that ordering behind a single entry
point rather than exposing every step.
For example:
```scala
// Scd1BatchProcessor – public API
def reconcileMicrobatch(
batchDf: DataFrame,
auxiliaryTableDf: DataFrame): DataFrame = { /* validate? */ ... }
// Scd1ForeachBatchExec
def execute(batchDf: DataFrame, batchId: Long): Unit = {
ScdBatchValidator(...).validateMicrobatch()
val reconciled = batchProcessor.reconcileMicrobatch(
batchDf,
batchDf.sparkSession.read.table(auxiliaryTableIdentifier.quotedString))
batchProcessor.mergeMicrobatchOntoAuxiliaryTable(reconciled,
auxiliaryTableIdentifier)
batchProcessor.mergeMicrobatchOntoTarget(reconciled, targetTableIdentifier)
}
```
1. Add `reconcileMicrobatch` on `Scd1BatchProcessor` for validate → dedupe →
metadata → projection → tombstones (validation can stay in `ScdBatchValidator`
and be called from the exec, as above).
2. Keep the existing step methods as `private[autocdc]` for unit tests in
`Scd1BatchProcessorSuite` / merge suites.
3. Slim `Scd1ForeachBatchExec` to validate → read aux →
`reconcileMicrobatch` → merge aux → merge target.
That preserves testability without exposing a composable-but-unsafe public
pipeline API. Minor naming nit: `*Exec` usually means a physical operator in
Spark; something like `Scd1ForeachBatchHandler` / `Scd1MicrobatchReconciler`
might fit pipelines better if you're open to a rename.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]