Eliaaazzz opened a new pull request, #38723:
URL: https://github.com/apache/beam/pull/38723

   Adds a Splittable-DoFn wrapper that brings Java's ``UnboundedSource`` /
   ``UnboundedReader`` / ``CheckpointMark`` abstractions to the Python SDK and
   makes them runnable on the portable Fn API (DirectRunner / FnApiRunner).
   Wires the new source type into ``iobase.Read.expand()`` so
   ``p | beam.io.Read(my_unbounded_source)`` dispatches alongside the existing
   ``BoundedSource`` branch. Loosely inspired by Java's
   ``Read.UnboundedSourceAsSDFWrapperFn`` -- not a literal port. The
   streaming-SDF template followed for the process loop / watermark / defer
   plumbing is ``apache_beam.transforms.periodicsequence``.
   
   Status: **draft** -- opening for early feedback from @yhu (mentor) while a
   design-first thread goes to ``[email protected]`` in parallel. The MVP is
   deliberately small; see the "Out of scope" list below.
   
   addresses #19137
   
   ## What's in this PR
   
   Two new files under ``sdks/python/apache_beam/io/``:
   
   * ``unbounded_source.py`` (~745 lines incl. docstrings): public ABCs
     (``CheckpointMark``, ``UnboundedReader``, ``UnboundedSource``,
     ``ReadFromUnboundedSource``) + SDF wrapper internals
     (``_UnboundedSourceRestriction``, ``_UnboundedSourceRestrictionCoder``,
     ``_UnboundedSourceRestrictionTracker``,
     ``_UnboundedSourceRestrictionProvider``).
   * ``unbounded_source_test.py`` (~1130 lines): unit + integration coverage.
   
   Plus a small change to ``iobase.py`` (~26 lines):
   
   * ``Read.expand()`` gains an ``UnboundedSource`` branch (lazy import to break
     the ``iobase`` <-> ``unbounded_source`` cycle) that delegates to
     ``ReadFromUnboundedSource``.
   * ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to
     ``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` +
     ``IsBounded.UNBOUNDED``. Decoding rides the existing ``PICKLED_SOURCE``
     URN registered on ``SourceBase``.
   
   ## Correctness highlights
   
   * **Watermark on data path** uses ``reader.get_watermark()`` (Java
     ``Read.java:594`` parity), not the per-record event time. Holder is
     ``(value, record_ts, source_wm)``; record event time labels the output
     ``TimestampedValue``, source watermark advances the estimator.
   * **Restriction has separate channels** for resume (``checkpoint_mark``)
     and commit hook (``finalization_checkpoint_mark``) so a done primary's
     finalize callback cannot contaminate the residual's resume state.
   * **Reader is closed** on every exit path -- EOF and split paths close
     inside the tracker; ``try_claim`` / ``try_split`` wrap reader-method
     calls and close before re-raising; the DoFn's ``finally`` provides
     defense-in-depth for downstream yield exceptions via the SDF wrapper's
     private chain with an ``isinstance`` guard and a warning log if the
     chain is ever refactored upstream.
   * **EOF advances the watermark estimator to ``MAX_TIMESTAMP``** so
     downstream event-time windows can close (would otherwise hang at the
     last reported watermark).
   * **Initial fan-out** via ``UnboundedSource.split(desired_num_splits=20,
     options)`` -- validates that returned sub-sources are
     ``UnboundedSource`` instances (raises ``TypeError`` if not, outside
     the split-refusal ``except``); on split-refusal exceptions, falls back
     to single-restriction and logs WARNING.
   * **``default_output_coder`` wired** via
     ``coders.registry.register_coder`` + ``element_type`` so a custom
     source-declared coder reaches the output PCollection through Beam's
     standard registry lookup (parameterised coders still require explicit
     user registration; logged as a warning).
   * **``poll_interval_seconds`` validated** to be > 0 in
     ``ReadFromUnboundedSource.__init__``.
   
   ## Test coverage
   
   58 tests, all green locally on Python 3.13 + Beam 2.71:
   
   * ``unbounded_source_test.py`` (42): ABC contracts; restriction coder
     round-trip; restriction tracker state machine (claim / split / EOF /
     no-data / check_done / progress / is_bounded); finalize idempotency;
     source-watermark vs. record-timestamp regression; finalize/resume
     channel separation; tracker-internal exception close on
     ``reader.advance`` and ``reader.get_watermark`` failures; DoFn
     generator close path (unit + integration with downstream raising
     ``Map``); cloudpickle round-trip for transform and source; circular
     import in three orderings via subprocess + tempfile; e2e DirectRunner
     pipeline (records in order + windowed GroupByKey).
   * ``iobase_test.py`` (+3): ``Read(UnboundedSource)`` dispatch through the
     new ``expand`` branch; ``Read.to_runner_api / from_runner_api``
     round-trip with ``IsBounded.UNBOUNDED``; PCollection ``is_bounded``
     assertion.
   
   ## Out of scope (deferred to W2+, tracked under #19137)
   
   Listed exhaustively in the module docstring at
   ``sdks/python/apache_beam/io/unbounded_source.py``:
   
   * Record-id-based deduplication (Java's ``ValueWithRecordId``).
   * Backlog-byte reporting (``restriction_size`` is constant 1;
     ``current_progress`` is binary 0.0 / 1.0).
   * Dynamic split fractions / runner-initiated work stealing.
   * Source-specific checkpoint coders threaded through the SDF restriction
     coder (today the coder always pickles checkpoint marks via
     ``_MemoizingPickleCoder`` regardless of the source's
     ``get_checkpoint_mark_coder``).
   * Reader caching across bundles (Java caches readers via a Guava cache;
     this PoC always rebuilds the reader from the checkpoint).
   * ``EmptyUnboundedSource`` terminal-state marker (we use an ``is_done``
     flag on the restriction instead).
   * Runner-side ``IsBounded.UNBOUNDED`` dispatch in
     ``bundle_processor.IMPULSE_READ_TRANSFORM``. Today the wire format
     round-trips correctly but execution flows through the composite's
     expanded sub-transforms (``Impulse | Map | SDF-ParDo``), not the URN
     handler.
   
   ------------------------
   
    - [x] Mention the appropriate issue in your description -- ``addresses
      #19137``.
    - [ ] Update ``CHANGES.md`` with noteworthy changes -- intentionally
      deferred until W1/W2 scope is finalised on ``dev@``; will add before
      the PR moves out of draft.
    - [ ] ICLA -- to be confirmed by mentor (large contribution under GSoC
      2026).
   
   cc @yhu for mentor review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to