ericm-db opened a new pull request, #56019:
URL: https://github.com/apache/spark/pull/56019

   ### What changes were proposed in this pull request?
   
   Add the commit log data structures for streaming sink evolution:
   
   - `CommitMetadataV3` (`VERSION_3` of the commit log wire format) carries a 
`sinkMetadataMap: Map[String, SinkMetadataInfo]` keyed by sink name, in 
addition to the V2 fields (`nextBatchWatermarkMs`, `stateUniqueIds`).
   - `SinkMetadataInfo` records per-sink metadata: `sinkName`, `commitOffset` 
(serialized via `OffsetV2.json()`), `providerName`, and an `isActive` flag used 
to distinguish the current sink from historical sinks that were used in earlier 
batches but are no longer in use.
   - `CommitMetadataV3.activeSinkMetadataInfoOpt` returns the entry with 
`isActive = true`, if any.
   - `CommitLog.createMetadata` learns to produce a `CommitMetadataV3` when 
`commitLogFormatVersion = VERSION_3`, requiring a non-empty `sinkMetadataMap`.
   - `CommitLog.readCommitMetadata` dispatches `v3` files to the new class.
   
   The V3 metadata is dormant in this PR: no caller produces it yet. Wiring 
through `MicroBatchExecution` (so each batch persists its sink name + offset, 
and so restarts read the map back) is the SPARK-56972 follow-up.
   
   This PR is built on top of #56018 (SPARK-56970). It currently shows the 
SPARK-56970 commits in its diff; that will resolve once SPARK-56970 merges.
   
   ### Why are the changes needed?
   
   SPARK-56719 added `DataStreamWriter.name()` as the API surface for sink 
evolution. Without a place in the commit log to durably record the sink name 
and offset alongside the rest of a committed batch's metadata, sink names 
cannot be observed on restart and the evolution feature cannot be completed. 
This PR introduces that storage in a separate, narrowly scoped change.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. `CommitMetadataV3` is in the internal 
`org.apache.spark.sql.execution.streaming.checkpointing` package and is not 
produced by any code path yet.
   
   ### How was this patch tested?
   
   Added unit tests in `CommitLogSuite`:
   - V3 SerDe with a single active sink (round-trips through commit log).
   - V3 retains historical sinks alongside the active one and 
`activeSinkMetadataInfoOpt` resolves correctly.
   - `createMetadata(version = V3, sinkMetadataMap = Map.empty)` fails fast 
with `IllegalArgumentException`.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude Code (claude-opus-4-7)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to