Eliaaazzz opened a new pull request, #38724:
URL: https://github.com/apache/beam/pull/38724
Brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark``
abstractions to the Python SDK as a Splittable-DoFn wrapper runnable on the
portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into
``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)``
dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired
by Java's ``Read.UnboundedSourceAsSDFWrapperFn``; the streaming-SDF template
followed for the process loop / watermark / defer plumbing is
``apache_beam.transforms.periodicsequence``.
addresses #19137
## What's in this PR
Two new files under ``sdks/python/apache_beam/io/``:
* ``unbounded_source.py``: public ABCs (``CheckpointMark``,
``UnboundedReader``, ``UnboundedSource``, ``ReadFromUnboundedSource``)
plus SDF wrapper internals (``_UnboundedSourceRestriction``,
``_UnboundedSourceRestrictionCoder``,
``_UnboundedSourceRestrictionTracker``,
``_UnboundedSourceRestrictionProvider``).
* ``unbounded_source_test.py``: 42 deterministic unit + integration tests.
Plus a small change to ``iobase.py``:
* ``Read.expand`` gains an ``UnboundedSource`` branch with a function-local
lazy import to break the ``iobase`` <-> ``unbounded_source`` cycle.
* ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to
``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` +
``ReadPayload(is_bounded=UNBOUNDED)``. Decode rides the existing
``PICKLED_SOURCE`` URN registered on ``SourceBase``. Runner-side
``IsBounded.UNBOUNDED`` dispatch in
``bundle_processor.IMPULSE_READ_TRANSFORM`` is W2 work; today execution
flows through the composite's expanded ``Impulse | Map | SDF-ParDo``.
## Correctness highlights
* **Data-path watermark** uses ``reader.get_watermark()`` (Java
``Read.java:594`` parity), not the per-record event time. Holder is
``(value, record_ts, source_wm)``; the record event time labels the
emitted ``TimestampedValue``, the source watermark advances the estimator.
* **Restriction has separate channels** for resume (``checkpoint_mark``)
and commit hook (``finalization_checkpoint_mark``) so a done primary's
finalize callback cannot contaminate the residual's resume state.
* **Reader is closed on every exit path** -- EOF and split paths close
inside the tracker; ``try_claim`` / ``try_split`` wrap reader-method
calls and close before re-raising; the DoFn ``finally`` provides
defense-in-depth for downstream yield exceptions via the SDF wrapper's
private chain with an ``isinstance`` guard and a warning log if that
chain is ever refactored upstream.
* **EOF advances the watermark estimator to ``MAX_TIMESTAMP``** so
downstream event-time windows can close (would otherwise hang at the
last reported source watermark).
* **Initial fan-out** via ``UnboundedSource.split(desired_num_splits=20,
options)`` -- validates returned sub-sources are ``UnboundedSource``
instances (raises ``TypeError`` outside the split-refusal ``except``);
on split-refusal exceptions, falls back to a single restriction and
logs a WARNING.
* **``default_output_coder`` wired** via
``coders.registry.register_coder`` + ``element_type`` so a custom
source-declared coder reaches the output PCollection through Beam's
standard registry lookup. Parameterised coders that cannot be
class-registered fall back gracefully with a warning -- users with such
coders must register explicitly before pipeline construction.
* **``poll_interval_seconds`` validated** to be > 0 in
``ReadFromUnboundedSource.__init__``.
## Test coverage
58 tests, all green locally on Python 3.13 + Beam 2.71:
* ``unbounded_source_test.py`` (42): ABC contracts; restriction coder
round-trip; restriction tracker state machine (claim / split / EOF /
no-data / check_done / progress / is_bounded); finalize idempotency;
source-watermark vs. record-timestamp regression; finalize / resume
channel separation; tracker-internal exception close on
``reader.advance`` and ``reader.get_watermark`` failures; DoFn
generator close path (unit + integration with downstream raising
``Map``); cloudpickle round-trip for transform and source; circular
import in three orderings via subprocess + tempfile; end-to-end
DirectRunner pipeline (records in order + windowed GroupByKey).
* ``iobase_test.py`` (+3): ``Read(UnboundedSource)`` dispatch through the
new ``expand`` branch; ``Read.to_runner_api`` / ``from_runner_api``
round-trip with ``IsBounded.UNBOUNDED``; PCollection ``is_bounded``
assertion.
yapf 0.43.0 + isort 7.0.0 clean (Beam pinned versions, configs in
``sdks/python/setup.cfg`` and ``sdks/python/.isort.cfg``).
## Out of scope (deferred to W2+, tracked under #19137)
Listed exhaustively in the module docstring at
``sdks/python/apache_beam/io/unbounded_source.py``:
* Record-id-based deduplication (Java's ``ValueWithRecordId``).
* Backlog-byte reporting (``restriction_size`` is constant 1;
``current_progress`` is binary 0.0 / 1.0).
* Dynamic split fractions / runner-initiated work stealing.
* Source-specific checkpoint coders threaded through the SDF restriction
coder (today the coder pickles checkpoint marks via
``_MemoizingPickleCoder`` regardless of the source's
``get_checkpoint_mark_coder``).
* Reader caching across bundles (Java caches readers via a Guava cache;
this PoC always rebuilds the reader from the checkpoint).
* ``EmptyUnboundedSource`` terminal-state marker (we use an ``is_done``
flag on the restriction instead).
* Runner-side ``IsBounded.UNBOUNDED`` dispatch in
``bundle_processor.IMPULSE_READ_TRANSFORM``. Today the wire format
round-trips correctly but execution flows through the composite's
expanded sub-transforms.
------------------------
- [x] Mention the appropriate issue in your description -- ``addresses
#19137``.
- [ ] Update ``CHANGES.md`` with noteworthy changes -- intentionally
deferred until W1/W2 scope is finalised on [email protected]; will
add before merge.
- [x] ICLA on file.
cc @yhu for mentor review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]