junaiddshaukat opened a new issue, #38955:
URL: https://github.com/apache/beam/issues/38955

   Tracking issue: #18479
   Follows: #38843 (Redistribute + type-agnostic ExecutableStage edge, merged)
   
   ## Summary
   First slice of the WatermarkManager. The runner needs a watermark before any
   stateful transform (GroupByKey etc.) can land, so this goes in next per the
   plan agreed with @je-ik. WatermarkManager is too large for one PR, so it's
   split; this part is the in-memory core, decoupled from Kafka wiring so it can
   be unit-tested in isolation.
   
   ## Design (agreed with @je-ik on Slack)
   A stage's input watermark is min() over its upstream source partitions'
   committed watermarks. Tracking is keyed on *source partitions*, not producer
   instances:
   - the total source-partition count travels in-band with every report, so the
     reader always knows how many it's waiting for;
   - a partition is owned by exactly one live instance, and on failure its
     partitions are reassigned and the new owner keeps reporting — so a killed
     instance never leaves the reader stuck (no instance liveness tracking, no
     describeConsumerGroups, no generationId needed).
   This was validated in a standalone Kafka Streams PoC before implementation.
   
   ## Scope (this PR)
   - WatermarkManager: observe(sourcePartition, committedWatermarkMillis,
     totalSourcePartitions); holds at BoundedWindow.TIMESTAMP_MIN_VALUE until
     every source partition has reported; then emits min(); output is clamped
     non-decreasing; a change in totalSourcePartitions re-opens the hold (the
     "revert" case, without an explicit epoch).
   - Unit tests for hold/emit, monotonicity, partition-count change, validation.
   
   ## Out of scope (later parts)
   - Part 2: wire it into ExecutableStageProcessor — flush
     (sourcePartition, committedWatermark, totalSourcePartitions) atomically 
with
     the offset commit (EOS), fan out to all downstream partitions, consume +
     feed the manager, replace the provisional "flush on every watermark"
     behavior; real-Kafka integration tests over the 5 scenarios (steady,
     scale-out, clean scale-in, SIGKILL, partition reassignment).
   - Part 3: persistence / watermark holds and downstream timer firing once
     state + timers land.
   
   ## Testing
   9 unit tests; ./gradlew :runners:kafka-streams:check green.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to