junaiddshaukat opened a new pull request, #38957:
URL: https://github.com/apache/beam/pull/38957
## Summary
First slice of the WatermarkManager, the prerequisite for any stateful
transform (GroupByKey etc.). Too large for one PR, so split; this part is the
in-memory core, decoupled from the Kafka wiring so it can be unit-tested in
isolation. Plan agreed with @je-ik on Slack.
## Design (agreed with @je-ik)
A stage's input watermark is min() over its upstream source partitions'
committed watermarks. Tracking is keyed on source partitions, not producer
instances:
- the total source-partition count travels in-band with every report, so the
reader always knows how many it's waiting for;
- a partition is owned by exactly one live instance, and on failure its
partitions are reassigned and the new owner keeps reporting — so a killed
instance never leaves the reader stuck (no instance liveness tracking, no
describeConsumerGroups, no generationId needed).
Validated in a standalone Kafka Streams PoC before implementation.
## Scope (this PR)
- `WatermarkManager`: `observe(sourcePartition, committedWatermarkMillis,
totalSourcePartitions)`; holds at `BoundedWindow.TIMESTAMP_MIN_VALUE` until
every source partition has reported; then emits `min()`; output is clamped
non-decreasing; a change in `totalSourcePartitions` re-opens the hold (the
"revert" case, without an explicit epoch).
- 9 unit tests: hold/emit, per-partition monotonicity, no-regression clamp,
partition-count increase/decrease, argument validation.
## Out of scope (later parts)
- Part 2: wire into `ExecutableStageProcessor` — flush
`(sourcePartition, committedWatermark, totalSourcePartitions)` atomically
with the offset commit (EOS), fan out to all downstream partitions, consume
+ feed the manager, replace the provisional "flush on every watermark"
behavior; real-Kafka integration tests over the 5 scenarios (steady,
scale-out, clean scale-in, SIGKILL, partition reassignment).
- Part 3: persistence / watermark holds and downstream timer firing once
state + timers land.
## Testing
`./gradlew :runners:kafka-streams:check` green; 9 new unit tests.
Refs #18479
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]