junaiddshaukat opened a new pull request, #38987:
URL: https://github.com/apache/beam/pull/38987

   ## Summary
   Part 2 of the WatermarkManager (part 1 = #38957). Wires the in-memory
   WatermarkManager into the data path so each fused stage computes and forwards
   its input watermark through it, replacing the provisional "flush on every
   received watermark" behavior.
   
   This is the in-JVM wiring. The stage still forwards its output watermark
   downstream via `ctx.forward` (now gated by the WatermarkManager so it only
   forwards when `min()` advances), so watermarks keep propagating and are
   observed in tests. Deferred is only the durable/distributed produce side —
   flushing the report atomically with the EOS offset commit, fanning it out to
   all downstream partitions, and a serde for it to cross topic boundaries — 
which
   needs the topic-based shuffle (GroupByKey) infra that isn't built yet.
   
   ## Changes
   - `KStreamsPayload`: watermark variant carries `(sourcePartition,
     totalSourcePartitions)` in-band with the millis.
   - `ExecutableStageProcessor`: feeds reports to a `WatermarkManager`, forwards
     the output watermark only when it advances, stamped as the stage's own 
single
     source (0 of 1); data still processed while holding. SDK harness created
     lazily on first data element.
   - `ImpulseProcessor`: stamps its terminal `TIMESTAMP_MAX_VALUE` as `(0, 1)`.
   
   ## Out of scope (later, depend on topic-based shuffle)
   - Producing the report atomically with the EOS offset commit.
   - Fan-out to all downstream partitions + a serde for topic boundaries.
   - Real-Kafka integration tests over the 5 scenarios; watermark holds /
     persistence and downstream timer firing.
   
   ## Testing
   - `ExecutableStageProcessorWatermarkTest` (MockProcessorContext): hold-until-
     all-report, min, monotonic non-re-forward, single-source stamping.
   - `WatermarkPropagationTest` (TopologyTestDriver): terminal watermark
     propagates Impulse -> ExecutableStage -> recording sink.
   - `./gradlew :runners:kafka-streams:check` green; 30 runner tests.
   
   Closes #38977
   Refs #18479
   cc @je-ik


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to