tejasiyer-dev opened a new pull request, #38609:
URL: https://github.com/apache/beam/pull/38609
fixes #38529
R: @AMOOOMA
This PR introduces AsyncDoFn and AsyncDoFnTest to the Apache Beam Java SDK.
AsyncDoFn acts as an execution wrapper around a standard synchronous DoFn,
offloading element processing to a background thread pool. Decoupling the
runner's event loop (main thread) from high-latency, I/O-heavy element
processing (background threads) prevents synchronous blocking, implements
backpressure, and significantly increases pipeline throughput.
**1. Ingestion & Local Deduplication (Main Thread)**
- JVM Isolation: Every AsyncDoFn instance generates a unique UUID upon
instantiation to keep static JVM registries completely isolated.
- Deduplication Boundary: Incoming elements pass through an idFn to extract
an elementId. If the elementId is already present in the local activeElements
Map in JVM memory, scheduling is skipped to enforce exactly-once execution
grouping.
**2. Backpressure & Capacity Check (Main Thread)**
- Capacity Management: The main thread checks if the background pool's
active task count is below maxItemsToBuffer.
- Exponential Backoff: If the pool is full, the main thread sleeps using
exponential backoff (starting at 10ms, doubling, capped at maxWaitTime / 500ms).
- Timeout Handling: If capacity doesn't clear within timeout (default 1s),
the main thread stops scheduling the task. The element is written directly to
persistent storage (BagState) and a @Timer is registered to process it later.
**3. Task Creation & Durable State Writing (Main Thread)**
When capacity is available, the main thread performs the following steps
sequentially:
1. Task Creation: Wraps the element logic inside a CompletableFuture and
submits it to the JVM's task queue.
2. In-Memory Tracking: Registers the elementId and its future in the
activeElements Map and increments the itemsInBuffer counter.
3. Durable State Write: Writes the element to the Runner's persistent
BagState (ensuring durability if a worker crashes).
4. Timer Scheduling: Schedules/updates a key-scoped @Timer callback to
manage future reconciliation.
**4. Background Execution (Background Worker Threads)**
- Decoupled Processing: Worker threads independently pull tasks from the JVM
queue.
- Bundle Lifecycle: The thread executes the full synchronous bundle
lifecycle of the wrapped DoFn (startBundle $\to$ processElement $\to$
finishBundle).
- Thread-Safe Accumulation: Workers append outputs to a private
AccumulatingOutputReceiver held in JVM memory, ensuring background threads do
not write downstream unsafely. On completion, the future returns the output
list and itemsInBuffer is decremented.
**5. Timer Reconciliation & Cleanup (Main Thread)**
When the @Timer fires for Key K, the main thread executes a synchronous
reconciliation cycle:
- Early Exit: If BagState for Key K is empty, it exits immediately to free
up CPU.
- Orphan Cleanup: Scans the activeElements Map and cancels/deletes any
running JVM task belonging to Key K whose ID is not in BagState (e.g., orphans
from failed bundle retries).
- State Reconciliation: Iterates through the elements listed in BagState:
1. Case 1 (Lost Task): The task is not in activeElements. It is
immediately rescheduled into the JVM thread pool.
2. Case 2 (Completed Task): The task is in activeElements and completed.
The main thread retrieves the outputs from the future, emits them downstream,
and removes the element from BagState and JVM memory.
3. Case 3 (In-Flight Task): The task is still running. The main thread
leaves it in BagState untouched.
- Timer Reset: If any elements remain unfinished, a new timer is scheduled
for the next check cycle.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [ ] Mention the appropriate issue in your description (for example:
`addresses #123`), if applicable. This will automatically add a link to the
pull request in the issue. If you would like the issue to automatically close
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI or the [workflows
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md)
to see a list of phrases to trigger workflows.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]