Eliaaazzz opened a new pull request, #39090:
URL: https://github.com/apache/beam/pull/39090
Stacked on #39023 (the Watch transform MVP); the first commit belongs to
that PR, so the change to review is the second commit. Relates to #18459 (the
unbounded `completed` dedup set).
By default Watch dedups by value identity and keeps one hash per distinct
output, so the per-input state grows without bound. This adds an opt-in
`Watch.with_timestamp_cursor()` that dedups by a high-water-mark timestamp
instead: Watch keeps only the greatest event time it has emitted for an input
and emits the polled outputs strictly past it, then advances the cursor. No
hash set is kept and the poll result is not hashed, so the per-input state and
per-checkpoint encoding are O(1) regardless of how many outputs the input
produces. It is for sources whose outputs carry strictly increasing event-time
timestamps; an output at or below the cursor is treated as already seen, so the
default exact dedup remains for arbitrary-relisting or out-of-order sources.
This also documents the event-time/watermark contract and adds a throttled
late-emission warning.
Testing: 26 unit tests and 5 subtests on the in-memory DirectRunner; yapf
0.43.0, isort 7.0.0, and pylint clean. Load tested on Dataflow Runner v2
(us-east1) against the default hash dedup with 4 inputs emitting 15000 outputs
per 1s round over a 240s budget: the cursor processed 6,180,000 outputs versus
the default's 2,190,000 in the same budget, because the default re-serializes
its growing dedup set on every checkpoint (it reached 500-660K entries, 15-20
MB per input) while the cursor writes one timestamp; the gap widens with
runtime. Both stayed exactly-once. Jobs on apache-beam-testing: cursor
`2026-06-24_07_27_48-14875934854431626885`, default
`2026-06-24_07_28_07-973880919496489449`.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [x] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]