Eliaaazzz opened a new pull request, #39090:
URL: https://github.com/apache/beam/pull/39090

   Stacked on #39023 (the Watch transform MVP); the first commit belongs to 
that PR, so the change to review is the second commit. Relates to #18459 (the 
unbounded `completed` dedup set).
   
   By default Watch dedups by value identity and keeps one hash per distinct 
output, so the per-input state grows without bound. This adds an opt-in 
`Watch.with_timestamp_cursor()` that dedups by a high-water-mark timestamp 
instead: Watch keeps only the greatest event time it has emitted for an input 
and emits the polled outputs strictly past it, then advances the cursor. No 
hash set is kept and the poll result is not hashed, so the per-input state and 
per-checkpoint encoding are O(1) regardless of how many outputs the input 
produces. It is for sources whose outputs carry strictly increasing event-time 
timestamps; an output at or below the cursor is treated as already seen, so the 
default exact dedup remains for arbitrary-relisting or out-of-order sources. 
This also documents the event-time/watermark contract and adds a throttled 
late-emission warning.
   
   Testing: 26 unit tests and 5 subtests on the in-memory DirectRunner; yapf 
0.43.0, isort 7.0.0, and pylint clean. Load tested on Dataflow Runner v2 
(us-east1) against the default hash dedup with 4 inputs emitting 15000 outputs 
per 1s round over a 240s budget: the cursor processed 6,180,000 outputs versus 
the default's 2,190,000 in the same budget, because the default re-serializes 
its growing dedup set on every checkpoint (it reached 500-660K entries, 15-20 
MB per input) while the cursor writes one timestamp; the gap widens with 
runtime. Both stayed exactly-once. Jobs on apache-beam-testing: cursor 
`2026-06-24_07_27_48-14875934854431626885`, default 
`2026-06-24_07_28_07-973880919496489449`.
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [x] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to