tejasiyer-dev opened a new pull request, #38609:
URL: https://github.com/apache/beam/pull/38609

   fixes #38529
   
   R: @AMOOOMA 
   
   This PR introduces AsyncDoFn and AsyncDoFnTest to the Apache Beam Java SDK.
   
   AsyncDoFn acts as an execution wrapper around a standard synchronous DoFn, 
offloading element processing to a background thread pool. Decoupling the 
runner's event loop (main thread) from high-latency, I/O-heavy element 
processing (background threads) prevents synchronous blocking, implements 
backpressure, and significantly increases pipeline throughput.
   
   **1. Ingestion & Local Deduplication (Main Thread)**
   
   - JVM Isolation: Every AsyncDoFn instance generates a unique UUID upon 
instantiation to keep static JVM registries completely isolated.
   - Deduplication Boundary: Incoming elements pass through an idFn to extract 
an elementId. If the elementId is already present in the local activeElements 
Map in JVM memory, scheduling is skipped to enforce exactly-once execution 
grouping.
   
   **2. Backpressure & Capacity Check (Main Thread)**
   
   - Capacity Management: The main thread checks if the background pool's 
active task count is below maxItemsToBuffer.
   - Exponential Backoff: If the pool is full, the main thread sleeps using 
exponential backoff (starting at 10ms, doubling, capped at maxWaitTime / 500ms).
   - Timeout Handling: If capacity doesn't clear within timeout (default 1s), 
the main thread stops scheduling the task. The element is written directly to 
persistent storage (BagState) and a @Timer is registered to process it later.
   
   **3. Task Creation & Durable State Writing (Main Thread)**
   When capacity is available, the main thread performs the following steps 
sequentially:
   
   1. Task Creation: Wraps the element logic inside a CompletableFuture and 
submits it to the JVM's task queue.
   2. In-Memory Tracking: Registers the elementId and its future in the 
activeElements Map and increments the itemsInBuffer counter.
   3. Durable State Write: Writes the element to the Runner's persistent 
BagState (ensuring durability if a worker crashes).
   4. Timer Scheduling: Schedules/updates a key-scoped @Timer callback to 
manage future reconciliation.
   
   **4. Background Execution (Background Worker Threads)**
   
   - Decoupled Processing: Worker threads independently pull tasks from the JVM 
queue.
   - Bundle Lifecycle: The thread executes the full synchronous bundle 
lifecycle of the wrapped DoFn (startBundle $\to$ processElement $\to$ 
finishBundle).
   - Thread-Safe Accumulation: Workers append outputs to a private 
AccumulatingOutputReceiver held in JVM memory, ensuring background threads do 
not write downstream unsafely. On completion, the future returns the output 
list and itemsInBuffer is decremented.
    
   **5. Timer Reconciliation & Cleanup (Main Thread)**
   When the @Timer fires for Key K, the main thread executes a synchronous 
reconciliation cycle:
   
   - Early Exit: If BagState for Key K is empty, it exits immediately to free 
up CPU.
   - Orphan Cleanup: Scans the activeElements Map and cancels/deletes any 
running JVM task belonging to Key K whose ID is not in BagState (e.g., orphans 
from failed bundle retries).
   - State Reconciliation: Iterates through the elements listed in BagState:
   
       1. Case 1 (Lost Task): The task is not in activeElements. It is 
immediately rescheduled into the JVM thread pool.
       2. Case 2 (Completed Task): The task is in activeElements and completed. 
The main thread retrieves the outputs from the future, emits them downstream, 
and removes the element from BagState and JVM memory.
       3. Case 3 (In-Flight Task): The task is still running. The main thread 
leaves it in BagState untouched.
   - Timer Reset: If any elements remain unfinished, a new timer is scheduled 
for the next check cycle.
   
   
   ------------------------
   
   
   
   
   
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] Mention the appropriate issue in your description (for example: 
`addresses #123`), if applicable. This will automatically add a link to the 
pull request in the issue. If you would like the issue to automatically close 
on merging the pull request, comment `fixes #<ISSUE NUMBER>` instead.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://github.com/apache/beam/blob/master/CONTRIBUTING.md#make-the-reviewers-job-easier).
   
   To check the build health, please visit 
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/actions/workflows/build_wheels.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/actions/workflows/python_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/actions/workflows/java_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Go 
tests](https://github.com/apache/beam/actions/workflows/go_tests.yml/badge.svg?event=schedule&&?branch=master)](https://github.com/apache/beam/actions?query=workflow%3A%22Go+tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI or the [workflows 
README](https://github.com/apache/beam/blob/master/.github/workflows/README.md) 
to see a list of phrases to trigger workflows.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to