gemini-code-assist[bot] commented on code in PR #38609:
URL: https://github.com/apache/beam/pull/38609#discussion_r3326269830


##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ *
+ * <p>/* NOTE: 1) The wrapped syncFn REQUIRES thread-safety if BOTH 
parallelism > 1 and the DoFn is
+ * stateful. 2) Tagged output multi-outputs are unsupported. 3) 
StartBundle/finishBundle are invoked
+ * per element so any batching or aggregation logic will not behave as 
expected.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient volatile @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, 
InFlightElement<?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+  // Reference counts for cloned instances sharing the same UUID. Coordinates 
safe,
+  // leak-free thread pool shutdown during teardown without crashing active 
sibling clones.
+  private static final ConcurrentHashMap<String, AtomicInteger> refCounts =
+      new ConcurrentHashMap<>();
+
+  // If contention becomes a bottleneck, this can be replaced with per-uuid 
locks
+  // in a ConcurrentHashMap
+  private static final ReentrantLock lock = new ReentrantLock();
+  private static final boolean verboseLogging = false;
+
+  private static class TimestampedOutput<T> {
+    final T value;
+    final @Nullable Instant timestamp;
+
+    TimestampedOutput(T value, @Nullable Instant timestamp) {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+  }
+
+  private static class InFlightElement<OutputT> {
+    final @Nullable Object key;
+    final CompletableFuture<List<TimestampedOutput<OutputT>>> future;
+
+    InFlightElement(
+        @Nullable Object key, 
CompletableFuture<List<TimestampedOutput<OutputT>>> future) {
+      this.key = key;
+      this.future = future;
+    }
+  }
+
+  // The In-Memory Accumulating Receiver
+  // Accumulates elements in-memory during asynchronous background worker 
execution.
+  // Buffered elements are only committed downstream once the parent task 
completes successfully
+  // and the timer fires.
+  private static class AccumulatingOutputReceiver<T> implements 
OutputReceiver<T> {
+    private final List<TimestampedOutput<T>> outputs =
+        Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {
+      return org.apache.beam.sdk.values.WindowedValues.<T>builder()
+          .setValue(value)
+          .setTimestamp(Instant.now())
+          
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))
+          
.setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)
+          .setReceiver(
+              windowedValue ->
+                  outputs.add(
+                      new TimestampedOutput<>(
+                          windowedValue.getValue(), 
windowedValue.getTimestamp())));
+    }
+
+    // Bypasses the nested anonymous OutputBuilder instantiation for standard 
outputs.
+    // JVM optimization to prevent garbage collection pressure under high 
pipeline throughput.
+    @Override
+    public void output(T output) {
+      outputs.add(new TimestampedOutput<>(output, null));
+    }
+
+    @Override
+    public void outputWithTimestamp(T output, Instant timestamp) {
+      outputs.add(new TimestampedOutput<>(output, timestamp));
+    }
+
+    public List<T> getOutputs() {
+      List<T> rawOutputs = new ArrayList<>();
+      for (TimestampedOutput<T> out : outputs) {
+        rawOutputs.add(out.value);
+      }
+      return rawOutputs;
+    }
+
+    public List<TimestampedOutput<T>> getTimestampedOutputs() {
+      return outputs;
+    }
+  }

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The `AccumulatingOutputReceiver` currently uses `Instant.now()` for output 
timestamps in `builder(T value)`, and defaults to `null` (which becomes the 
timer's processing-time firing timestamp) when `syncFn` calls `output(T 
output)`. This corrupts the event-time timestamps of the elements, shifting 
them to the processing time of the background thread or timer firing, which 
will break downstream windowing and watermarks.
   
   To preserve event-time correctness, we should pass the input element's 
original timestamp to `AccumulatingOutputReceiver` and use it as the default 
timestamp for any outputs.
   
   ```java
     private static class AccumulatingOutputReceiver<T> implements 
OutputReceiver<T> {    private final List<TimestampedOutput<T>> outputs =       
 Collections.synchronizedList(new ArrayList<>());    private final Instant 
inputTimestamp;    AccumulatingOutputReceiver(Instant inputTimestamp) {      
this.inputTimestamp = inputTimestamp;    }    @Override    public 
org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {      return 
org.apache.beam.sdk.values.WindowedValues.<T>builder()          
.setValue(value)          .setTimestamp(inputTimestamp)          
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))         
 .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)      
    .setReceiver(              windowedValue ->                  outputs.add(   
                   new TimestampedOutput<>(                          
windowedValue.getValue(), windowedValue.getTimestamp())));    }    @Override    
public void output(T output) {      outp
 uts.add(new TimestampedOutput<>(output, inputTimestamp));    }    @Override    
public void outputWithTimestamp(T output, Instant timestamp) {      
outputs.add(new TimestampedOutput<>(output, timestamp));    }    public List<T> 
getOutputs() {      List<T> rawOutputs = new ArrayList<>();      for 
(TimestampedOutput<T> out : outputs) {        rawOutputs.add(out.value);      } 
     return rawOutputs;    }    public List<TimestampedOutput<T>> 
getTimestampedOutputs() {      return outputs;    }  }
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ *
+ * <p>/* NOTE: 1) The wrapped syncFn REQUIRES thread-safety if BOTH 
parallelism > 1 and the DoFn is
+ * stateful. 2) Tagged output multi-outputs are unsupported. 3) 
StartBundle/finishBundle are invoked
+ * per element so any batching or aggregation logic will not behave as 
expected.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient volatile @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, 
InFlightElement<?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+  // Reference counts for cloned instances sharing the same UUID. Coordinates 
safe,
+  // leak-free thread pool shutdown during teardown without crashing active 
sibling clones.
+  private static final ConcurrentHashMap<String, AtomicInteger> refCounts =
+      new ConcurrentHashMap<>();
+
+  // If contention becomes a bottleneck, this can be replaced with per-uuid 
locks
+  // in a ConcurrentHashMap
+  private static final ReentrantLock lock = new ReentrantLock();

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `lock` is a static `ReentrantLock` shared across all instances of 
`AsyncDoFn` in the entire JVM. Since `scheduleIfRoom` (called for every element 
in `processElement`) and `commitFinishedItems` (called on every timer firing) 
both acquire this lock, this will cause severe lock contention and serialize 
processing across all threads and all `AsyncDoFn` steps in the pipeline. This 
completely defeats the purpose of asynchronous processing and high throughput.
   
   We should use per-uuid locks (e.g., stored in a static 
`ConcurrentHashMap<String, ReentrantLock>`) to avoid JVM-wide contention.



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ *
+ * <p>/* NOTE: 1) The wrapped syncFn REQUIRES thread-safety if BOTH 
parallelism > 1 and the DoFn is
+ * stateful. 2) Tagged output multi-outputs are unsupported. 3) 
StartBundle/finishBundle are invoked
+ * per element so any batching or aggregation logic will not behave as 
expected.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient volatile @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, 
InFlightElement<?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+  // Reference counts for cloned instances sharing the same UUID. Coordinates 
safe,
+  // leak-free thread pool shutdown during teardown without crashing active 
sibling clones.
+  private static final ConcurrentHashMap<String, AtomicInteger> refCounts =
+      new ConcurrentHashMap<>();
+
+  // If contention becomes a bottleneck, this can be replaced with per-uuid 
locks
+  // in a ConcurrentHashMap
+  private static final ReentrantLock lock = new ReentrantLock();
+  private static final boolean verboseLogging = false;
+
+  private static class TimestampedOutput<T> {
+    final T value;
+    final @Nullable Instant timestamp;
+
+    TimestampedOutput(T value, @Nullable Instant timestamp) {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+  }
+
+  private static class InFlightElement<OutputT> {
+    final @Nullable Object key;
+    final CompletableFuture<List<TimestampedOutput<OutputT>>> future;
+
+    InFlightElement(
+        @Nullable Object key, 
CompletableFuture<List<TimestampedOutput<OutputT>>> future) {
+      this.key = key;
+      this.future = future;
+    }
+  }
+
+  // The In-Memory Accumulating Receiver
+  // Accumulates elements in-memory during asynchronous background worker 
execution.
+  // Buffered elements are only committed downstream once the parent task 
completes successfully
+  // and the timer fires.
+  private static class AccumulatingOutputReceiver<T> implements 
OutputReceiver<T> {
+    private final List<TimestampedOutput<T>> outputs =
+        Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {
+      return org.apache.beam.sdk.values.WindowedValues.<T>builder()
+          .setValue(value)
+          .setTimestamp(Instant.now())
+          
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))
+          
.setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)
+          .setReceiver(
+              windowedValue ->
+                  outputs.add(
+                      new TimestampedOutput<>(
+                          windowedValue.getValue(), 
windowedValue.getTimestamp())));
+    }
+
+    // Bypasses the nested anonymous OutputBuilder instantiation for standard 
outputs.
+    // JVM optimization to prevent garbage collection pressure under high 
pipeline throughput.
+    @Override
+    public void output(T output) {
+      outputs.add(new TimestampedOutput<>(output, null));
+    }
+
+    @Override
+    public void outputWithTimestamp(T output, Instant timestamp) {
+      outputs.add(new TimestampedOutput<>(output, timestamp));
+    }
+
+    public List<T> getOutputs() {
+      List<T> rawOutputs = new ArrayList<>();
+      for (TimestampedOutput<T> out : outputs) {
+        rawOutputs.add(out.value);
+      }
+      return rawOutputs;
+    }
+
+    public List<TimestampedOutput<T>> getTimestampedOutputs() {
+      return outputs;
+    }
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool) {
+    this(
+        syncFn,
+        parallelism,
+        timerFrequency,
+        maxItemsToBuffer,
+        timeout,
+        maxWaitTime,
+        idFn,
+        useThreadPool,
+        null);
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool,
+      @Nullable Coder<KV<K, InputT>> coder) {
+    this.syncFn = syncFn;
+    this.parallelism = parallelism;
+    if (timerFrequency.getMillis() <= 0) {
+      throw new IllegalArgumentException("timerFrequency must be greater than 
zero");
+    }
+    this.timerFrequency = timerFrequency;
+    this.maxItemsToBuffer =
+        (maxItemsToBuffer != null)
+            ? maxItemsToBuffer
+            : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY);
+    this.timeout = (timeout != null) ? timeout : 
Duration.standardSeconds(DEFAULT_TIMEOUT_SEC);
+    this.maxWaitTime =
+        (maxWaitTime != null) ? maxWaitTime : 
Duration.millis(DEFAULT_MAX_WAIT_TIME_MS);
+    this.idFn =
+        (idFn != null)
+            ? idFn
+            : (SerializableFunction<InputT, Object>)
+                input -> java.util.Objects.requireNonNull(input);
+    this.useThreadPool = useThreadPool;
+    this.uuid = UUID.randomUUID().toString();
+    this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : 
StateSpecs.bag();
+  }
+
+  private ExecutorService getThreadPool() {
+    ExecutorService threadPool = pool.get(uuid);
+    if (threadPool == null) {
+      throw new IllegalStateException("Thread pool not initialized for UUID: " 
+ uuid);
+    }
+    return threadPool;
+  }
+
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<Object, InFlightElement<OutputT>> 
getProcessingElements() {
+    ConcurrentHashMap<Object, InFlightElement<?>> elements = 
processingElements.get(uuid);
+    if (elements == null) {
+      throw new IllegalStateException("Processing elements map not initialized 
for UUID: " + uuid);
+    }
+    return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) 
(ConcurrentHashMap<?, ?>) elements;
+  }
+
+  private AtomicInteger getItemsInBuffer() {
+    AtomicInteger buffer = itemsInBuffer.get(uuid);
+    if (buffer == null) {
+      throw new IllegalStateException("Buffer counter not initialized for 
UUID: " + uuid);
+    }
+    return buffer;
+  }
+
+  @Setup
+  public void setup(PipelineOptions options) {
+    this.pipelineOptions = options;
+
+    // Setup the wrapped DoFn
+    DoFnInvokers.invokerFor(syncFn)
+        .invokeSetup(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return options;
+              }
+
+              @Override
+              public String getErrorContext() {
+                return "AsyncDoFn/Setup";
+              }
+            });
+
+    if (useThreadPool) {
+      LOG.info("Using thread pool for asynchronous execution with parallelism 
{}", parallelism);
+    }
+
+    lock.lock();
+    try {
+      pool.computeIfAbsent(uuid, k -> 
Executors.newFixedThreadPool(parallelism));
+      processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>());
+      itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0));
+      refCounts.computeIfAbsent(uuid, k -> new 
AtomicInteger(0)).incrementAndGet();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  // Clean up JVM-wide shared resources to prevent thread leaks on the worker
+  @Teardown
+  public void teardown() {
+    DoFnInvokers.invokerFor(syncFn).invokeTeardown();
+    ExecutorService threadPool = null;
+    lock.lock();
+    try {
+      AtomicInteger refCount = refCounts.get(uuid);
+      if (refCount != null && refCount.decrementAndGet() == 0) {
+        refCounts.remove(uuid);
+        threadPool = pool.remove(uuid);
+        processingElements.remove(uuid);
+        itemsInBuffer.remove(uuid);
+      }
+    } finally {
+      lock.unlock();
+    }
+    if (threadPool != null) {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, 
TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  // Asynchronous Scheduling & Deduplication
+  // Submits tasks to the background thread pool. If an element with the same 
ID is already
+  // in-flight,
+  // the submission is silently ignored to enforce exactly-once semantics.
+  private boolean scheduleIfRoom(
+      KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean 
ignoreBuffer) {
+    lock.lock();
+    try {
+      ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = 
getProcessingElements();
+      Object elementId = idFn.apply(element.getValue());
+
+      if (activeElements.containsKey(elementId)) {
+        LOG.info("Item {} already in processing elements", element);
+        return true;
+      }
+
+      int currentBuffer = getItemsInBuffer().get();
+      if (currentBuffer < maxItemsToBuffer || ignoreBuffer) {
+        java.util.concurrent.Executor executor =
+            useThreadPool ? getThreadPool() : 
java.util.concurrent.ForkJoinPool.commonPool();
+
+        // Pending asynchronous task that will produce a list of outputs
+        CompletableFuture<List<TimestampedOutput<OutputT>>> future =
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    AccumulatingOutputReceiver<OutputT> receiver =
+                        new AccumulatingOutputReceiver<>();
+                    DoFnInvoker<InputT, OutputT> invoker = 
DoFnInvokers.invokerFor(syncFn);
+
+                    DoFnInvoker.ArgumentProvider<InputT, OutputT> 
bundleArgProvider =
+                        new DoFnInvoker.BaseArgumentProvider<InputT, 
OutputT>() {
+                          @Override
+                          public PipelineOptions pipelineOptions() {
+                            PipelineOptions options = pipelineOptions;
+                            if (options == null) {
+                              throw new IllegalStateException("PipelineOptions 
not set");
+                            }
+                            return options;
+                          }
+
+                          @Override
+                          public DoFn<InputT, OutputT>.FinishBundleContext 
finishBundleContext(
+                              DoFn<InputT, OutputT> doFn) {
+                            return doFn.new FinishBundleContext() {
+                              @Override
+                              public PipelineOptions getPipelineOptions() {
+                                return pipelineOptions();
+                              }
+
+                              @Override
+                              public void output(
+                                  OutputT output, Instant timestamp, 
BoundedWindow window) {
+                                receiver.outputWithTimestamp(output, 
timestamp);
+                              }
+
+                              @Override
+                              public <T> void output(
+                                  TupleTag<T> tag,
+                                  T output,
+                                  Instant timestamp,
+                                  BoundedWindow window) {
+                                throw new UnsupportedOperationException(
+                                    "Tagged output not supported in "
+                                        + "FinishBundleContext for AsyncDoFn");
+                              }
+                            };
+                          }
+
+                          @Override
+                          public String getErrorContext() {
+                            return "AsyncDoFn/Bundle";
+                          }
+                        };
+
+                    invoker.invokeStartBundle(bundleArgProvider);
+
+                    DoFnInvoker.ArgumentProvider<InputT, OutputT> 
processArgProvider =
+                        new DoFnInvoker.BaseArgumentProvider<InputT, 
OutputT>() {
+                          @Override
+                          public InputT element(DoFn<InputT, OutputT> doFn) {
+                            return element.getValue();
+                          }
+
+                          @Override
+                          public OutputReceiver<OutputT> outputReceiver(
+                              DoFn<InputT, OutputT> doFn) {
+                            return receiver;
+                          }
+
+                          @Override
+                          public BoundedWindow window() {
+                            return window;
+                          }
+
+                          @Override
+                          public Instant timestamp(DoFn<InputT, OutputT> doFn) 
{
+                            return timestamp;
+                          }
+
+                          @Override
+                          public PipelineOptions pipelineOptions() {
+                            PipelineOptions options = pipelineOptions;
+                            if (options == null) {
+                              throw new IllegalStateException("PipelineOptions 
not set");
+                            }
+                            return options;
+                          }
+
+                          @Override
+                          public String getErrorContext() {
+                            return "AsyncDoFn/Process";
+                          }
+                        };
+
+                    invoker.invokeProcessElement(processArgProvider);
+                    invoker.invokeFinishBundle(bundleArgProvider);
+
+                    return receiver.getTimestampedOutputs();
+                  } catch (Exception e) {
+                    throw new CompletionException(e);
+                  }
+                },
+                executor);
+
+        // Assigned to 'unused' to satisfy ErrorProne while preserving parent 
future for
+        // cancellation
+        CompletableFuture<List<TimestampedOutput<OutputT>>> unused =
+            future.whenComplete(
+                (res, ex) -> {
+                  getItemsInBuffer().decrementAndGet();
+                });
+
+        activeElements.put(elementId, new InFlightElement<>(element.getKey(), 
future));
+        getItemsInBuffer().incrementAndGet();
+        return true;
+      }
+
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void scheduleItem(KV<K, InputT> element, BoundedWindow window, 
Instant timestamp) {
+    boolean done = false;
+    long sleepTime = INITIAL_BACKOFF_SLEEP_MS;
+    long totalSleep = 0;
+    long timeoutMs = timeout.getMillis();
+
+    while (!done && totalSleep < timeoutMs) {
+      done = scheduleIfRoom(element, window, timestamp, false);
+      if (!done) {
+        long sleep = Math.min(maxWaitTime.getMillis(), sleepTime);
+        if (verboseLogging || totalSleep > BACKPRESSURE_LOG_THRESHOLD_MS) {
+          LOG.info(
+              "buffer is full for item {}, {} waiting {} ms. Have waited for 
{} ms.",
+              element,
+              getItemsInBuffer().get(),
+              sleep,
+              totalSleep);
+        }
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while waiting for space in 
buffer", e);
+        }
+
+        // Prevents long overflow possibility
+        if (sleepTime < maxWaitTime.getMillis()) {
+          sleepTime *= 2;
+        }
+
+        totalSleep += sleep;
+      }
+    }
+    // Timeout: element skips JVM pool but stays in BagState for timer to 
reschedule later.
+  }
+
+  // Uses hashcode based jitter instead of random for deterministic 
rescheduling
+  // Satisfies lint check
+  private Instant nextTimeToFire(@Nullable K key) {
+    long seed = (key == null) ? 0 : key.hashCode();
+    double fractionalOffset = Math.abs(seed % 1000000) / 1000000.0;
+    double timerFrequencySec = timerFrequency.getMillis() / 1000.0;
+    double nowSec = System.currentTimeMillis() / 1000.0;
+
+    double base = Math.floor((nowSec + timerFrequencySec) / timerFrequencySec) 
* timerFrequencySec;
+    double offset = fractionalOffset * timerFrequencySec;
+
+    return Instant.ofEpochMilli((long) ((base + offset) * 1000));
+  }
+
+  @ProcessElement
+  public void processElement(
+      ProcessContext c,
+      BoundedWindow window,
+      @StateId("to_process") BagState<KV<K, InputT>> toProcessState,
+      @TimerId("timer") Timer timer) {
+
+    KV<K, InputT> element = c.element();
+    scheduleItem(element, window, c.timestamp());
+    toProcessState.add(element);
+
+    Instant timeToFire = nextTimeToFire(element.getKey());
+    timer.set(timeToFire);
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In `processElement`, the timer is set to `nextTimeToFire(element.getKey())` 
for every incoming element. If elements for a key arrive continuously at a rate 
faster than the timer frequency, the timer will be repeatedly overwritten and 
pushed further into the future. This causes starvation, as the timer may never 
fire, and elements in `toProcessState` will be delayed indefinitely.
   
   To prevent this, we should use a `ValueState<Boolean>` to track if a timer 
is already active for the key, and only set the timer if one is not already 
pending.
   
   ```java
     @ProcessElement  public void processElement(      ProcessContext c,      
BoundedWindow window,      @StateId("to_process") BagState<KV<K, InputT>> 
toProcessState,      @StateId("timer_set") ValueState<Boolean> timerSetState,   
   @TimerId("timer") Timer timer) {    KV<K, InputT> element = c.element();    
scheduleItem(element, window, c.timestamp());    toProcessState.add(element);   
 Boolean isSet = timerSetState.read();    if (isSet == null || !isSet) {      
Instant timeToFire = nextTimeToFire(element.getKey());      
timer.set(timeToFire);      timerSetState.write(true);    }  }
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ *
+ * <p>/* NOTE: 1) The wrapped syncFn REQUIRES thread-safety if BOTH 
parallelism > 1 and the DoFn is
+ * stateful. 2) Tagged output multi-outputs are unsupported. 3) 
StartBundle/finishBundle are invoked
+ * per element so any batching or aggregation logic will not behave as 
expected.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient volatile @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, 
InFlightElement<?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+  // Reference counts for cloned instances sharing the same UUID. Coordinates 
safe,
+  // leak-free thread pool shutdown during teardown without crashing active 
sibling clones.
+  private static final ConcurrentHashMap<String, AtomicInteger> refCounts =
+      new ConcurrentHashMap<>();
+
+  // If contention becomes a bottleneck, this can be replaced with per-uuid 
locks
+  // in a ConcurrentHashMap
+  private static final ReentrantLock lock = new ReentrantLock();
+  private static final boolean verboseLogging = false;
+
+  private static class TimestampedOutput<T> {
+    final T value;
+    final @Nullable Instant timestamp;
+
+    TimestampedOutput(T value, @Nullable Instant timestamp) {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+  }
+
+  private static class InFlightElement<OutputT> {
+    final @Nullable Object key;
+    final CompletableFuture<List<TimestampedOutput<OutputT>>> future;
+
+    InFlightElement(
+        @Nullable Object key, 
CompletableFuture<List<TimestampedOutput<OutputT>>> future) {
+      this.key = key;
+      this.future = future;
+    }
+  }
+
+  // The In-Memory Accumulating Receiver
+  // Accumulates elements in-memory during asynchronous background worker 
execution.
+  // Buffered elements are only committed downstream once the parent task 
completes successfully
+  // and the timer fires.
+  private static class AccumulatingOutputReceiver<T> implements 
OutputReceiver<T> {
+    private final List<TimestampedOutput<T>> outputs =
+        Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {
+      return org.apache.beam.sdk.values.WindowedValues.<T>builder()
+          .setValue(value)
+          .setTimestamp(Instant.now())
+          
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))
+          
.setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)
+          .setReceiver(
+              windowedValue ->
+                  outputs.add(
+                      new TimestampedOutput<>(
+                          windowedValue.getValue(), 
windowedValue.getTimestamp())));
+    }
+
+    // Bypasses the nested anonymous OutputBuilder instantiation for standard 
outputs.
+    // JVM optimization to prevent garbage collection pressure under high 
pipeline throughput.
+    @Override
+    public void output(T output) {
+      outputs.add(new TimestampedOutput<>(output, null));
+    }
+
+    @Override
+    public void outputWithTimestamp(T output, Instant timestamp) {
+      outputs.add(new TimestampedOutput<>(output, timestamp));
+    }
+
+    public List<T> getOutputs() {
+      List<T> rawOutputs = new ArrayList<>();
+      for (TimestampedOutput<T> out : outputs) {
+        rawOutputs.add(out.value);
+      }
+      return rawOutputs;
+    }
+
+    public List<TimestampedOutput<T>> getTimestampedOutputs() {
+      return outputs;
+    }
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool) {
+    this(
+        syncFn,
+        parallelism,
+        timerFrequency,
+        maxItemsToBuffer,
+        timeout,
+        maxWaitTime,
+        idFn,
+        useThreadPool,
+        null);
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool,
+      @Nullable Coder<KV<K, InputT>> coder) {
+    this.syncFn = syncFn;
+    this.parallelism = parallelism;
+    if (timerFrequency.getMillis() <= 0) {
+      throw new IllegalArgumentException("timerFrequency must be greater than 
zero");
+    }
+    this.timerFrequency = timerFrequency;
+    this.maxItemsToBuffer =
+        (maxItemsToBuffer != null)
+            ? maxItemsToBuffer
+            : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY);
+    this.timeout = (timeout != null) ? timeout : 
Duration.standardSeconds(DEFAULT_TIMEOUT_SEC);
+    this.maxWaitTime =
+        (maxWaitTime != null) ? maxWaitTime : 
Duration.millis(DEFAULT_MAX_WAIT_TIME_MS);
+    this.idFn =
+        (idFn != null)
+            ? idFn
+            : (SerializableFunction<InputT, Object>)
+                input -> java.util.Objects.requireNonNull(input);
+    this.useThreadPool = useThreadPool;
+    this.uuid = UUID.randomUUID().toString();
+    this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : 
StateSpecs.bag();
+  }
+
+  private ExecutorService getThreadPool() {
+    ExecutorService threadPool = pool.get(uuid);
+    if (threadPool == null) {
+      throw new IllegalStateException("Thread pool not initialized for UUID: " 
+ uuid);
+    }
+    return threadPool;
+  }
+
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<Object, InFlightElement<OutputT>> 
getProcessingElements() {
+    ConcurrentHashMap<Object, InFlightElement<?>> elements = 
processingElements.get(uuid);
+    if (elements == null) {
+      throw new IllegalStateException("Processing elements map not initialized 
for UUID: " + uuid);
+    }
+    return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) 
(ConcurrentHashMap<?, ?>) elements;
+  }
+
+  private AtomicInteger getItemsInBuffer() {
+    AtomicInteger buffer = itemsInBuffer.get(uuid);
+    if (buffer == null) {
+      throw new IllegalStateException("Buffer counter not initialized for 
UUID: " + uuid);
+    }
+    return buffer;
+  }
+
+  @Setup
+  public void setup(PipelineOptions options) {
+    this.pipelineOptions = options;
+
+    // Setup the wrapped DoFn
+    DoFnInvokers.invokerFor(syncFn)
+        .invokeSetup(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return options;
+              }
+
+              @Override
+              public String getErrorContext() {
+                return "AsyncDoFn/Setup";
+              }
+            });
+
+    if (useThreadPool) {
+      LOG.info("Using thread pool for asynchronous execution with parallelism 
{}", parallelism);
+    }
+
+    lock.lock();
+    try {
+      pool.computeIfAbsent(uuid, k -> 
Executors.newFixedThreadPool(parallelism));
+      processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>());
+      itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0));
+      refCounts.computeIfAbsent(uuid, k -> new 
AtomicInteger(0)).incrementAndGet();
+    } finally {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   If `useThreadPool` is false, we still create a fixed thread pool of size 
`parallelism` and store it in `pool`. This is wasteful and can lead to resource 
leaks. We should only create the thread pool if `useThreadPool` is true.
   
   ```java
       try {      if (useThreadPool) {        pool.computeIfAbsent(uuid, k -> 
Executors.newFixedThreadPool(parallelism));      }      
processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>());      
itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0));      
refCounts.computeIfAbsent(uuid, k -> new AtomicInteger(0)).incrementAndGet();   
 } finally {
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ *
+ * <p>/* NOTE: 1) The wrapped syncFn REQUIRES thread-safety if BOTH 
parallelism > 1 and the DoFn is
+ * stateful. 2) Tagged output multi-outputs are unsupported. 3) 
StartBundle/finishBundle are invoked
+ * per element so any batching or aggregation logic will not behave as 
expected.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient volatile @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, 
InFlightElement<?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+  // Reference counts for cloned instances sharing the same UUID. Coordinates 
safe,
+  // leak-free thread pool shutdown during teardown without crashing active 
sibling clones.
+  private static final ConcurrentHashMap<String, AtomicInteger> refCounts =
+      new ConcurrentHashMap<>();
+
+  // If contention becomes a bottleneck, this can be replaced with per-uuid 
locks
+  // in a ConcurrentHashMap
+  private static final ReentrantLock lock = new ReentrantLock();
+  private static final boolean verboseLogging = false;
+
+  private static class TimestampedOutput<T> {
+    final T value;
+    final @Nullable Instant timestamp;
+
+    TimestampedOutput(T value, @Nullable Instant timestamp) {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+  }
+
+  private static class InFlightElement<OutputT> {
+    final @Nullable Object key;
+    final CompletableFuture<List<TimestampedOutput<OutputT>>> future;
+
+    InFlightElement(
+        @Nullable Object key, 
CompletableFuture<List<TimestampedOutput<OutputT>>> future) {
+      this.key = key;
+      this.future = future;
+    }
+  }
+
+  // The In-Memory Accumulating Receiver
+  // Accumulates elements in-memory during asynchronous background worker 
execution.
+  // Buffered elements are only committed downstream once the parent task 
completes successfully
+  // and the timer fires.
+  private static class AccumulatingOutputReceiver<T> implements 
OutputReceiver<T> {
+    private final List<TimestampedOutput<T>> outputs =
+        Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {
+      return org.apache.beam.sdk.values.WindowedValues.<T>builder()
+          .setValue(value)
+          .setTimestamp(Instant.now())
+          
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))
+          
.setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)
+          .setReceiver(
+              windowedValue ->
+                  outputs.add(
+                      new TimestampedOutput<>(
+                          windowedValue.getValue(), 
windowedValue.getTimestamp())));
+    }
+
+    // Bypasses the nested anonymous OutputBuilder instantiation for standard 
outputs.
+    // JVM optimization to prevent garbage collection pressure under high 
pipeline throughput.
+    @Override
+    public void output(T output) {
+      outputs.add(new TimestampedOutput<>(output, null));
+    }
+
+    @Override
+    public void outputWithTimestamp(T output, Instant timestamp) {
+      outputs.add(new TimestampedOutput<>(output, timestamp));
+    }
+
+    public List<T> getOutputs() {
+      List<T> rawOutputs = new ArrayList<>();
+      for (TimestampedOutput<T> out : outputs) {
+        rawOutputs.add(out.value);
+      }
+      return rawOutputs;
+    }
+
+    public List<TimestampedOutput<T>> getTimestampedOutputs() {
+      return outputs;
+    }
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool) {
+    this(
+        syncFn,
+        parallelism,
+        timerFrequency,
+        maxItemsToBuffer,
+        timeout,
+        maxWaitTime,
+        idFn,
+        useThreadPool,
+        null);
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool,
+      @Nullable Coder<KV<K, InputT>> coder) {
+    this.syncFn = syncFn;
+    this.parallelism = parallelism;
+    if (timerFrequency.getMillis() <= 0) {
+      throw new IllegalArgumentException("timerFrequency must be greater than 
zero");
+    }
+    this.timerFrequency = timerFrequency;
+    this.maxItemsToBuffer =
+        (maxItemsToBuffer != null)
+            ? maxItemsToBuffer
+            : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY);
+    this.timeout = (timeout != null) ? timeout : 
Duration.standardSeconds(DEFAULT_TIMEOUT_SEC);
+    this.maxWaitTime =
+        (maxWaitTime != null) ? maxWaitTime : 
Duration.millis(DEFAULT_MAX_WAIT_TIME_MS);
+    this.idFn =
+        (idFn != null)
+            ? idFn
+            : (SerializableFunction<InputT, Object>)
+                input -> java.util.Objects.requireNonNull(input);
+    this.useThreadPool = useThreadPool;
+    this.uuid = UUID.randomUUID().toString();
+    this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : 
StateSpecs.bag();
+  }
+
+  private ExecutorService getThreadPool() {
+    ExecutorService threadPool = pool.get(uuid);
+    if (threadPool == null) {
+      throw new IllegalStateException("Thread pool not initialized for UUID: " 
+ uuid);
+    }
+    return threadPool;
+  }
+
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<Object, InFlightElement<OutputT>> 
getProcessingElements() {
+    ConcurrentHashMap<Object, InFlightElement<?>> elements = 
processingElements.get(uuid);
+    if (elements == null) {
+      throw new IllegalStateException("Processing elements map not initialized 
for UUID: " + uuid);
+    }
+    return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) 
(ConcurrentHashMap<?, ?>) elements;
+  }
+
+  private AtomicInteger getItemsInBuffer() {
+    AtomicInteger buffer = itemsInBuffer.get(uuid);
+    if (buffer == null) {
+      throw new IllegalStateException("Buffer counter not initialized for 
UUID: " + uuid);
+    }
+    return buffer;
+  }
+
+  @Setup
+  public void setup(PipelineOptions options) {
+    this.pipelineOptions = options;
+
+    // Setup the wrapped DoFn
+    DoFnInvokers.invokerFor(syncFn)
+        .invokeSetup(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return options;
+              }
+
+              @Override
+              public String getErrorContext() {
+                return "AsyncDoFn/Setup";
+              }
+            });
+
+    if (useThreadPool) {
+      LOG.info("Using thread pool for asynchronous execution with parallelism 
{}", parallelism);
+    }
+
+    lock.lock();
+    try {
+      pool.computeIfAbsent(uuid, k -> 
Executors.newFixedThreadPool(parallelism));
+      processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>());
+      itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0));
+      refCounts.computeIfAbsent(uuid, k -> new 
AtomicInteger(0)).incrementAndGet();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  // Clean up JVM-wide shared resources to prevent thread leaks on the worker
+  @Teardown
+  public void teardown() {
+    DoFnInvokers.invokerFor(syncFn).invokeTeardown();
+    ExecutorService threadPool = null;
+    lock.lock();
+    try {
+      AtomicInteger refCount = refCounts.get(uuid);
+      if (refCount != null && refCount.decrementAndGet() == 0) {
+        refCounts.remove(uuid);
+        threadPool = pool.remove(uuid);
+        processingElements.remove(uuid);
+        itemsInBuffer.remove(uuid);
+      }
+    } finally {
+      lock.unlock();
+    }
+    if (threadPool != null) {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, 
TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  // Asynchronous Scheduling & Deduplication
+  // Submits tasks to the background thread pool. If an element with the same 
ID is already
+  // in-flight,
+  // the submission is silently ignored to enforce exactly-once semantics.
+  private boolean scheduleIfRoom(
+      KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean 
ignoreBuffer) {
+    lock.lock();
+    try {
+      ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = 
getProcessingElements();
+      Object elementId = idFn.apply(element.getValue());
+
+      if (activeElements.containsKey(elementId)) {
+        LOG.info("Item {} already in processing elements", element);
+        return true;
+      }
+
+      int currentBuffer = getItemsInBuffer().get();
+      if (currentBuffer < maxItemsToBuffer || ignoreBuffer) {
+        java.util.concurrent.Executor executor =
+            useThreadPool ? getThreadPool() : 
java.util.concurrent.ForkJoinPool.commonPool();
+
+        // Pending asynchronous task that will produce a list of outputs
+        CompletableFuture<List<TimestampedOutput<OutputT>>> future =
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    AccumulatingOutputReceiver<OutputT> receiver =
+                        new AccumulatingOutputReceiver<>();
+                    DoFnInvoker<InputT, OutputT> invoker = 
DoFnInvokers.invokerFor(syncFn);
+
+                    DoFnInvoker.ArgumentProvider<InputT, OutputT> 
bundleArgProvider =
+                        new DoFnInvoker.BaseArgumentProvider<InputT, 
OutputT>() {
+                          @Override
+                          public PipelineOptions pipelineOptions() {
+                            PipelineOptions options = pipelineOptions;
+                            if (options == null) {
+                              throw new IllegalStateException("PipelineOptions 
not set");
+                            }
+                            return options;
+                          }
+
+                          @Override
+                          public DoFn<InputT, OutputT>.FinishBundleContext 
finishBundleContext(
+                              DoFn<InputT, OutputT> doFn) {
+                            return doFn.new FinishBundleContext() {
+                              @Override
+                              public PipelineOptions getPipelineOptions() {
+                                return pipelineOptions();
+                              }
+
+                              @Override
+                              public void output(
+                                  OutputT output, Instant timestamp, 
BoundedWindow window) {
+                                receiver.outputWithTimestamp(output, 
timestamp);
+                              }
+
+                              @Override
+                              public <T> void output(
+                                  TupleTag<T> tag,
+                                  T output,
+                                  Instant timestamp,
+                                  BoundedWindow window) {
+                                throw new UnsupportedOperationException(
+                                    "Tagged output not supported in "
+                                        + "FinishBundleContext for AsyncDoFn");
+                              }
+                            };
+                          }
+
+                          @Override
+                          public String getErrorContext() {
+                            return "AsyncDoFn/Bundle";
+                          }
+                        };
+
+                    invoker.invokeStartBundle(bundleArgProvider);
+
+                    DoFnInvoker.ArgumentProvider<InputT, OutputT> 
processArgProvider =
+                        new DoFnInvoker.BaseArgumentProvider<InputT, 
OutputT>() {
+                          @Override
+                          public InputT element(DoFn<InputT, OutputT> doFn) {
+                            return element.getValue();
+                          }
+
+                          @Override
+                          public OutputReceiver<OutputT> outputReceiver(
+                              DoFn<InputT, OutputT> doFn) {
+                            return receiver;
+                          }
+
+                          @Override
+                          public BoundedWindow window() {
+                            return window;
+                          }
+
+                          @Override
+                          public Instant timestamp(DoFn<InputT, OutputT> doFn) 
{
+                            return timestamp;
+                          }
+
+                          @Override
+                          public PipelineOptions pipelineOptions() {
+                            PipelineOptions options = pipelineOptions;
+                            if (options == null) {
+                              throw new IllegalStateException("PipelineOptions 
not set");
+                            }
+                            return options;
+                          }
+
+                          @Override
+                          public String getErrorContext() {
+                            return "AsyncDoFn/Process";
+                          }
+                        };
+
+                    invoker.invokeProcessElement(processArgProvider);
+                    invoker.invokeFinishBundle(bundleArgProvider);
+
+                    return receiver.getTimestampedOutputs();
+                  } catch (Exception e) {
+                    throw new CompletionException(e);
+                  }
+                },
+                executor);
+
+        // Assigned to 'unused' to satisfy ErrorProne while preserving parent 
future for
+        // cancellation
+        CompletableFuture<List<TimestampedOutput<OutputT>>> unused =
+            future.whenComplete(
+                (res, ex) -> {
+                  getItemsInBuffer().decrementAndGet();
+                });
+
+        activeElements.put(elementId, new InFlightElement<>(element.getKey(), 
future));
+        getItemsInBuffer().incrementAndGet();
+        return true;
+      }
+
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void scheduleItem(KV<K, InputT> element, BoundedWindow window, 
Instant timestamp) {
+    boolean done = false;
+    long sleepTime = INITIAL_BACKOFF_SLEEP_MS;
+    long totalSleep = 0;
+    long timeoutMs = timeout.getMillis();
+
+    while (!done && totalSleep < timeoutMs) {
+      done = scheduleIfRoom(element, window, timestamp, false);
+      if (!done) {
+        long sleep = Math.min(maxWaitTime.getMillis(), sleepTime);
+        if (verboseLogging || totalSleep > BACKPRESSURE_LOG_THRESHOLD_MS) {
+          LOG.info(
+              "buffer is full for item {}, {} waiting {} ms. Have waited for 
{} ms.",
+              element,
+              getItemsInBuffer().get(),
+              sleep,
+              totalSleep);
+        }
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while waiting for space in 
buffer", e);
+        }
+
+        // Prevents long overflow possibility
+        if (sleepTime < maxWaitTime.getMillis()) {
+          sleepTime *= 2;
+        }
+
+        totalSleep += sleep;
+      }
+    }
+    // Timeout: element skips JVM pool but stays in BagState for timer to 
reschedule later.
+  }
+
+  // Uses hashcode based jitter instead of random for deterministic 
rescheduling
+  // Satisfies lint check
+  private Instant nextTimeToFire(@Nullable K key) {
+    long seed = (key == null) ? 0 : key.hashCode();
+    double fractionalOffset = Math.abs(seed % 1000000) / 1000000.0;
+    double timerFrequencySec = timerFrequency.getMillis() / 1000.0;
+    double nowSec = System.currentTimeMillis() / 1000.0;
+
+    double base = Math.floor((nowSec + timerFrequencySec) / timerFrequencySec) 
* timerFrequencySec;
+    double offset = fractionalOffset * timerFrequencySec;
+
+    return Instant.ofEpochMilli((long) ((base + offset) * 1000));
+  }
+
+  @ProcessElement
+  public void processElement(
+      ProcessContext c,
+      BoundedWindow window,
+      @StateId("to_process") BagState<KV<K, InputT>> toProcessState,
+      @TimerId("timer") Timer timer) {
+
+    KV<K, InputT> element = c.element();
+    scheduleItem(element, window, c.timestamp());
+    toProcessState.add(element);
+
+    Instant timeToFire = nextTimeToFire(element.getKey());
+    timer.set(timeToFire);
+  }
+
+  @OnTimer("timer")
+  public void onTimer(
+      OnTimerContext c,
+      @StateId("to_process") BagState<KV<K, InputT>> toProcessState,
+      @TimerId("timer") Timer timer,
+      OutputReceiver<OutputT> receiver) {
+
+    commitFinishedItems(c.fireTimestamp(), toProcessState, timer, receiver);
+  }
+
+  // Synchronizes local task results with the runner's persistent state 
container.
+  // Emits successfully completed elements, cancels rolled-back tasks, and 
reschedules lost work.
+  void commitFinishedItems(
+      Instant fireTimestamp,
+      BagState<KV<K, InputT>> toProcessState,
+      Timer timer,
+      OutputReceiver<OutputT> receiver) {
+
+    Iterable<KV<K, InputT>> toProcessLocal = toProcessState.read();
+    if (toProcessLocal == null || !toProcessLocal.iterator().hasNext()) {
+      // Early Exit: if BagState is empty, we skip checking activeElements for 
this key.
+      return;
+    }
+
+    // Since fireTimestamp is key-scoped, we determine the current key from 
the first element in
+    // state
+    List<KV<K, InputT>> stateList = new ArrayList<>();
+    K key = null;
+    for (KV<K, InputT> element : toProcessLocal) {
+      stateList.add(element);
+      if (key == null) {
+        key = element.getKey();
+      }
+    }
+
+    if (verboseLogging) {
+      LOG.info("processing timer for key: {}", key);
+    }
+
+    ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = 
getProcessingElements();
+
+    List<List<TimestampedOutput<OutputT>>> toReturn = new ArrayList<>();
+    Set<KV<K, InputT>> finishedItems = new HashSet<>();
+    List<KV<K, InputT>> toReschedule = new ArrayList<>();
+
+    int itemsFinished = 0;
+    int itemsNotYetFinished = 0;
+    int itemsRescheduled = 0;
+
+    Set<Object> finishedElementIds = new HashSet<>();
+    Set<Object> inFlightElementIds = new HashSet<>();
+    Set<Object> rescheduledElementIds = new HashSet<>();
+
+    lock.lock();
+    try {
+      Set<Object> stateElementIds = new HashSet<>();
+      for (KV<K, InputT> element : stateList) {
+        stateElementIds.add(idFn.apply(element.getValue()));
+      }
+
+      List<Object> toCancelIds = new ArrayList<>();
+      for (Map.Entry<Object, InFlightElement<OutputT>> entry : 
activeElements.entrySet()) {
+        InFlightElement<OutputT> inFlight = entry.getValue();
+        if (java.util.Objects.equals(inFlight.key, key)
+            && !stateElementIds.contains(entry.getKey())) {
+          toCancelIds.add(entry.getKey());
+        }
+      }
+
+      for (Object cancelId : toCancelIds) {
+        InFlightElement<OutputT> inFlight = activeElements.get(cancelId);
+        if (inFlight != null) {
+          inFlight.future.cancel(true);
+          activeElements.remove(cancelId);
+        }
+      }
+
+      for (KV<K, InputT> element : stateList) {
+        Object elementId = idFn.apply(element.getValue());
+
+        // Skip processing if we already completed, rescheduled, or found this 
elementId active in
+        // this cycle
+        if (finishedElementIds.contains(elementId)
+            || rescheduledElementIds.contains(elementId)
+            || inFlightElementIds.contains(elementId)) {
+          continue;
+        }
+
+        if (activeElements.containsKey(elementId)) {
+          InFlightElement<OutputT> inFlight = activeElements.get(elementId);
+          if (inFlight.future.isDone()) {
+            try {
+              if (!inFlight.future.isCancelled()) {
+                toReturn.add(inFlight.future.get());
+              }
+              finishedItems.add(element);
+              finishedElementIds.add(elementId);
+              activeElements.remove(elementId);
+              itemsFinished++;
+            } catch (Exception e) {
+              LOG.error("Error executing async task for element {}", element, 
e);
+              throw new RuntimeException("Error executing async task for 
element " + element, e);
+            }
+          } else {

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   In `commitFinishedItems`, when a background task fails, 
`inFlight.future.get()` throws an exception, and we immediately throw a 
`RuntimeException`. However, any successfully completed sibling tasks in the 
same batch have already been removed from `activeElements` (via 
`activeElements.remove(elementId)`).
   
   Since the runner retries the entire timer on failure, and `toProcessState` 
is transactional (so the removals are rolled back in the runner's state), those 
successfully completed elements will still be in `toProcessState` on retry. But 
because they were already removed from the non-transactional in-memory 
`activeElements` map, they will be treated as "Lost Tasks" and 
rescheduled/re-run!
   
   To avoid re-running successfully completed tasks on retry, we should keep 
them in `activeElements` until the entire batch succeeds, and only remove them 
from `activeElements` at the very end of `commitFinishedItems`. If a task 
fails, we should remove only the failed task from `activeElements` (so it can 
be retried) and throw the exception.
   
   ```java
           if (activeElements.containsKey(elementId)) {          
InFlightElement<OutputT> inFlight = activeElements.get(elementId);          if 
(inFlight.future.isDone()) {            try {              if 
(!inFlight.future.isCancelled()) {                
toReturn.add(inFlight.future.get());              }              
finishedItems.add(element);              finishedElementIds.add(elementId);     
         itemsFinished++;            } catch (Exception e) {              
activeElements.remove(elementId);              LOG.error("Error executing async 
task for element {}", element, e);              throw new 
RuntimeException("Error executing async task for element " + element, e);       
     }          } else {            inFlightElementIds.add(elementId);          
  itemsNotYetFinished++;          }        }
   ```



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java:
##########
@@ -0,0 +1,740 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantLock;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvoker;
+import org.apache.beam.sdk.transforms.reflect.DoFnInvokers;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Class that wraps a dofn and converts it from one which process elements 
synchronously to one
+ * which processes them asynchronously.
+ *
+ * <p>For synchronous dofns the default settings mean that many (100s) of 
elements will be processed
+ * in parallel and that processing an element will block all other work on 
that key. In addition
+ * runners are optimized for latencies less than a few seconds and longer 
operations can result in
+ * high retry rates. Async should be considered when the default parallelism 
is not correct and/or
+ * items are expected to take longer than a few seconds to process.
+ *
+ * <p>/* NOTE: 1) The wrapped syncFn REQUIRES thread-safety if BOTH 
parallelism > 1 and the DoFn is
+ * stateful. 2) Tagged output multi-outputs are unsupported. 3) 
StartBundle/finishBundle are invoked
+ * per element so any batching or aggregation logic will not behave as 
expected.
+ */
+public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, 
OutputT> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class);
+
+  private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10;
+  private static final int DEFAULT_TIMEOUT_SEC = 1;
+  private static final int DEFAULT_MAX_WAIT_TIME_MS = 500;
+  private static final int TEARDOWN_AWAIT_SEC = 5;
+  private static final int INITIAL_BACKOFF_SLEEP_MS = 10;
+  private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000;
+
+  @StateId("to_process")
+  private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec;
+
+  @TimerId("timer")
+  private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
+
+  private final DoFn<InputT, OutputT> syncFn;
+  private final int parallelism;
+  private final Duration timerFrequency;
+  private final int maxItemsToBuffer;
+  private final Duration timeout;
+  private final Duration maxWaitTime;
+  private final SerializableFunction<InputT, Object> idFn;
+  private final boolean useThreadPool;
+  private final String uuid;
+
+  private transient volatile @Nullable PipelineOptions pipelineOptions;
+
+  // Shared JVM-Wide States (Static Registries)
+  // Map-backed registry holding shared resources across serialized worker 
instances. Since runners
+  // clone DoFn instances on the same worker node, static maps ensure safe 
JVM-wide resource reuse.
+  private static final ConcurrentHashMap<String, ExecutorService> pool = new 
ConcurrentHashMap<>();
+  // activeElements (processingElements) is global JVM memory (all keys)
+  private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, 
InFlightElement<?>>>
+      processingElements = new ConcurrentHashMap<>();
+  private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer =
+      new ConcurrentHashMap<>();
+  // Reference counts for cloned instances sharing the same UUID. Coordinates 
safe,
+  // leak-free thread pool shutdown during teardown without crashing active 
sibling clones.
+  private static final ConcurrentHashMap<String, AtomicInteger> refCounts =
+      new ConcurrentHashMap<>();
+
+  // If contention becomes a bottleneck, this can be replaced with per-uuid 
locks
+  // in a ConcurrentHashMap
+  private static final ReentrantLock lock = new ReentrantLock();
+  private static final boolean verboseLogging = false;
+
+  private static class TimestampedOutput<T> {
+    final T value;
+    final @Nullable Instant timestamp;
+
+    TimestampedOutput(T value, @Nullable Instant timestamp) {
+      this.value = value;
+      this.timestamp = timestamp;
+    }
+  }
+
+  private static class InFlightElement<OutputT> {
+    final @Nullable Object key;
+    final CompletableFuture<List<TimestampedOutput<OutputT>>> future;
+
+    InFlightElement(
+        @Nullable Object key, 
CompletableFuture<List<TimestampedOutput<OutputT>>> future) {
+      this.key = key;
+      this.future = future;
+    }
+  }
+
+  // The In-Memory Accumulating Receiver
+  // Accumulates elements in-memory during asynchronous background worker 
execution.
+  // Buffered elements are only committed downstream once the parent task 
completes successfully
+  // and the timer fires.
+  private static class AccumulatingOutputReceiver<T> implements 
OutputReceiver<T> {
+    private final List<TimestampedOutput<T>> outputs =
+        Collections.synchronizedList(new ArrayList<>());
+
+    @Override
+    public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) {
+      return org.apache.beam.sdk.values.WindowedValues.<T>builder()
+          .setValue(value)
+          .setTimestamp(Instant.now())
+          
.setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE))
+          
.setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING)
+          .setReceiver(
+              windowedValue ->
+                  outputs.add(
+                      new TimestampedOutput<>(
+                          windowedValue.getValue(), 
windowedValue.getTimestamp())));
+    }
+
+    // Bypasses the nested anonymous OutputBuilder instantiation for standard 
outputs.
+    // JVM optimization to prevent garbage collection pressure under high 
pipeline throughput.
+    @Override
+    public void output(T output) {
+      outputs.add(new TimestampedOutput<>(output, null));
+    }
+
+    @Override
+    public void outputWithTimestamp(T output, Instant timestamp) {
+      outputs.add(new TimestampedOutput<>(output, timestamp));
+    }
+
+    public List<T> getOutputs() {
+      List<T> rawOutputs = new ArrayList<>();
+      for (TimestampedOutput<T> out : outputs) {
+        rawOutputs.add(out.value);
+      }
+      return rawOutputs;
+    }
+
+    public List<TimestampedOutput<T>> getTimestampedOutputs() {
+      return outputs;
+    }
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool) {
+    this(
+        syncFn,
+        parallelism,
+        timerFrequency,
+        maxItemsToBuffer,
+        timeout,
+        maxWaitTime,
+        idFn,
+        useThreadPool,
+        null);
+  }
+
+  public AsyncDoFn(
+      DoFn<InputT, OutputT> syncFn,
+      int parallelism,
+      Duration timerFrequency,
+      @Nullable Integer maxItemsToBuffer,
+      @Nullable Duration timeout,
+      @Nullable Duration maxWaitTime,
+      @Nullable SerializableFunction<InputT, Object> idFn,
+      boolean useThreadPool,
+      @Nullable Coder<KV<K, InputT>> coder) {
+    this.syncFn = syncFn;
+    this.parallelism = parallelism;
+    if (timerFrequency.getMillis() <= 0) {
+      throw new IllegalArgumentException("timerFrequency must be greater than 
zero");
+    }
+    this.timerFrequency = timerFrequency;
+    this.maxItemsToBuffer =
+        (maxItemsToBuffer != null)
+            ? maxItemsToBuffer
+            : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY);
+    this.timeout = (timeout != null) ? timeout : 
Duration.standardSeconds(DEFAULT_TIMEOUT_SEC);
+    this.maxWaitTime =
+        (maxWaitTime != null) ? maxWaitTime : 
Duration.millis(DEFAULT_MAX_WAIT_TIME_MS);
+    this.idFn =
+        (idFn != null)
+            ? idFn
+            : (SerializableFunction<InputT, Object>)
+                input -> java.util.Objects.requireNonNull(input);
+    this.useThreadPool = useThreadPool;
+    this.uuid = UUID.randomUUID().toString();
+    this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : 
StateSpecs.bag();
+  }
+
+  private ExecutorService getThreadPool() {
+    ExecutorService threadPool = pool.get(uuid);
+    if (threadPool == null) {
+      throw new IllegalStateException("Thread pool not initialized for UUID: " 
+ uuid);
+    }
+    return threadPool;
+  }
+
+  @SuppressWarnings("unchecked")
+  private ConcurrentHashMap<Object, InFlightElement<OutputT>> 
getProcessingElements() {
+    ConcurrentHashMap<Object, InFlightElement<?>> elements = 
processingElements.get(uuid);
+    if (elements == null) {
+      throw new IllegalStateException("Processing elements map not initialized 
for UUID: " + uuid);
+    }
+    return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) 
(ConcurrentHashMap<?, ?>) elements;
+  }
+
+  private AtomicInteger getItemsInBuffer() {
+    AtomicInteger buffer = itemsInBuffer.get(uuid);
+    if (buffer == null) {
+      throw new IllegalStateException("Buffer counter not initialized for 
UUID: " + uuid);
+    }
+    return buffer;
+  }
+
+  @Setup
+  public void setup(PipelineOptions options) {
+    this.pipelineOptions = options;
+
+    // Setup the wrapped DoFn
+    DoFnInvokers.invokerFor(syncFn)
+        .invokeSetup(
+            new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() {
+              @Override
+              public PipelineOptions pipelineOptions() {
+                return options;
+              }
+
+              @Override
+              public String getErrorContext() {
+                return "AsyncDoFn/Setup";
+              }
+            });
+
+    if (useThreadPool) {
+      LOG.info("Using thread pool for asynchronous execution with parallelism 
{}", parallelism);
+    }
+
+    lock.lock();
+    try {
+      pool.computeIfAbsent(uuid, k -> 
Executors.newFixedThreadPool(parallelism));
+      processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>());
+      itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0));
+      refCounts.computeIfAbsent(uuid, k -> new 
AtomicInteger(0)).incrementAndGet();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  // Clean up JVM-wide shared resources to prevent thread leaks on the worker
+  @Teardown
+  public void teardown() {
+    DoFnInvokers.invokerFor(syncFn).invokeTeardown();
+    ExecutorService threadPool = null;
+    lock.lock();
+    try {
+      AtomicInteger refCount = refCounts.get(uuid);
+      if (refCount != null && refCount.decrementAndGet() == 0) {
+        refCounts.remove(uuid);
+        threadPool = pool.remove(uuid);
+        processingElements.remove(uuid);
+        itemsInBuffer.remove(uuid);
+      }
+    } finally {
+      lock.unlock();
+    }
+    if (threadPool != null) {
+      threadPool.shutdown();
+      try {
+        if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, 
TimeUnit.SECONDS)) {
+          threadPool.shutdownNow();
+        }
+      } catch (InterruptedException e) {
+        threadPool.shutdownNow();
+        Thread.currentThread().interrupt();
+      }
+    }
+  }
+
+  // Asynchronous Scheduling & Deduplication
+  // Submits tasks to the background thread pool. If an element with the same 
ID is already
+  // in-flight,
+  // the submission is silently ignored to enforce exactly-once semantics.
+  private boolean scheduleIfRoom(
+      KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean 
ignoreBuffer) {
+    lock.lock();
+    try {
+      ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = 
getProcessingElements();
+      Object elementId = idFn.apply(element.getValue());
+
+      if (activeElements.containsKey(elementId)) {
+        LOG.info("Item {} already in processing elements", element);
+        return true;
+      }
+
+      int currentBuffer = getItemsInBuffer().get();
+      if (currentBuffer < maxItemsToBuffer || ignoreBuffer) {
+        java.util.concurrent.Executor executor =
+            useThreadPool ? getThreadPool() : 
java.util.concurrent.ForkJoinPool.commonPool();
+
+        // Pending asynchronous task that will produce a list of outputs
+        CompletableFuture<List<TimestampedOutput<OutputT>>> future =
+            CompletableFuture.supplyAsync(
+                () -> {
+                  try {
+                    AccumulatingOutputReceiver<OutputT> receiver =
+                        new AccumulatingOutputReceiver<>();
+                    DoFnInvoker<InputT, OutputT> invoker = 
DoFnInvokers.invokerFor(syncFn);
+
+                    DoFnInvoker.ArgumentProvider<InputT, OutputT> 
bundleArgProvider =
+                        new DoFnInvoker.BaseArgumentProvider<InputT, 
OutputT>() {
+                          @Override
+                          public PipelineOptions pipelineOptions() {
+                            PipelineOptions options = pipelineOptions;
+                            if (options == null) {
+                              throw new IllegalStateException("PipelineOptions 
not set");
+                            }
+                            return options;
+                          }
+
+                          @Override
+                          public DoFn<InputT, OutputT>.FinishBundleContext 
finishBundleContext(
+                              DoFn<InputT, OutputT> doFn) {
+                            return doFn.new FinishBundleContext() {
+                              @Override
+                              public PipelineOptions getPipelineOptions() {
+                                return pipelineOptions();
+                              }
+
+                              @Override
+                              public void output(
+                                  OutputT output, Instant timestamp, 
BoundedWindow window) {
+                                receiver.outputWithTimestamp(output, 
timestamp);
+                              }
+
+                              @Override
+                              public <T> void output(
+                                  TupleTag<T> tag,
+                                  T output,
+                                  Instant timestamp,
+                                  BoundedWindow window) {
+                                throw new UnsupportedOperationException(
+                                    "Tagged output not supported in "
+                                        + "FinishBundleContext for AsyncDoFn");
+                              }
+                            };
+                          }
+
+                          @Override
+                          public String getErrorContext() {
+                            return "AsyncDoFn/Bundle";
+                          }
+                        };
+
+                    invoker.invokeStartBundle(bundleArgProvider);
+
+                    DoFnInvoker.ArgumentProvider<InputT, OutputT> 
processArgProvider =
+                        new DoFnInvoker.BaseArgumentProvider<InputT, 
OutputT>() {
+                          @Override
+                          public InputT element(DoFn<InputT, OutputT> doFn) {
+                            return element.getValue();
+                          }
+
+                          @Override
+                          public OutputReceiver<OutputT> outputReceiver(
+                              DoFn<InputT, OutputT> doFn) {
+                            return receiver;
+                          }
+
+                          @Override
+                          public BoundedWindow window() {
+                            return window;
+                          }
+
+                          @Override
+                          public Instant timestamp(DoFn<InputT, OutputT> doFn) 
{
+                            return timestamp;
+                          }
+
+                          @Override
+                          public PipelineOptions pipelineOptions() {
+                            PipelineOptions options = pipelineOptions;
+                            if (options == null) {
+                              throw new IllegalStateException("PipelineOptions 
not set");
+                            }
+                            return options;
+                          }
+
+                          @Override
+                          public String getErrorContext() {
+                            return "AsyncDoFn/Process";
+                          }
+                        };
+
+                    invoker.invokeProcessElement(processArgProvider);
+                    invoker.invokeFinishBundle(bundleArgProvider);
+
+                    return receiver.getTimestampedOutputs();
+                  } catch (Exception e) {
+                    throw new CompletionException(e);
+                  }
+                },
+                executor);
+
+        // Assigned to 'unused' to satisfy ErrorProne while preserving parent 
future for
+        // cancellation
+        CompletableFuture<List<TimestampedOutput<OutputT>>> unused =
+            future.whenComplete(
+                (res, ex) -> {
+                  getItemsInBuffer().decrementAndGet();
+                });
+
+        activeElements.put(elementId, new InFlightElement<>(element.getKey(), 
future));
+        getItemsInBuffer().incrementAndGet();
+        return true;
+      }
+
+      return false;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  private void scheduleItem(KV<K, InputT> element, BoundedWindow window, 
Instant timestamp) {
+    boolean done = false;
+    long sleepTime = INITIAL_BACKOFF_SLEEP_MS;
+    long totalSleep = 0;
+    long timeoutMs = timeout.getMillis();
+
+    while (!done && totalSleep < timeoutMs) {
+      done = scheduleIfRoom(element, window, timestamp, false);
+      if (!done) {
+        long sleep = Math.min(maxWaitTime.getMillis(), sleepTime);
+        if (verboseLogging || totalSleep > BACKPRESSURE_LOG_THRESHOLD_MS) {
+          LOG.info(
+              "buffer is full for item {}, {} waiting {} ms. Have waited for 
{} ms.",
+              element,
+              getItemsInBuffer().get(),
+              sleep,
+              totalSleep);
+        }
+        try {
+          Thread.sleep(sleep);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+          throw new RuntimeException("Interrupted while waiting for space in 
buffer", e);
+        }
+
+        // Prevents long overflow possibility
+        if (sleepTime < maxWaitTime.getMillis()) {
+          sleepTime *= 2;
+        }
+
+        totalSleep += sleep;
+      }
+    }
+    // Timeout: element skips JVM pool but stays in BagState for timer to 
reschedule later.
+  }
+
+  // Uses hashcode based jitter instead of random for deterministic 
rescheduling
+  // Satisfies lint check
+  private Instant nextTimeToFire(@Nullable K key) {
+    long seed = (key == null) ? 0 : key.hashCode();
+    double fractionalOffset = Math.abs(seed % 1000000) / 1000000.0;
+    double timerFrequencySec = timerFrequency.getMillis() / 1000.0;
+    double nowSec = System.currentTimeMillis() / 1000.0;
+
+    double base = Math.floor((nowSec + timerFrequencySec) / timerFrequencySec) 
* timerFrequencySec;
+    double offset = fractionalOffset * timerFrequencySec;
+
+    return Instant.ofEpochMilli((long) ((base + offset) * 1000));
+  }
+
+  @ProcessElement
+  public void processElement(
+      ProcessContext c,
+      BoundedWindow window,
+      @StateId("to_process") BagState<KV<K, InputT>> toProcessState,
+      @TimerId("timer") Timer timer) {
+
+    KV<K, InputT> element = c.element();
+    scheduleItem(element, window, c.timestamp());
+    toProcessState.add(element);
+
+    Instant timeToFire = nextTimeToFire(element.getKey());
+    timer.set(timeToFire);
+  }
+
+  @OnTimer("timer")
+  public void onTimer(
+      OnTimerContext c,
+      @StateId("to_process") BagState<KV<K, InputT>> toProcessState,
+      @TimerId("timer") Timer timer,
+      OutputReceiver<OutputT> receiver) {
+
+    commitFinishedItems(c.fireTimestamp(), toProcessState, timer, receiver);
+  }
+
+  // Synchronizes local task results with the runner's persistent state 
container.
+  // Emits successfully completed elements, cancels rolled-back tasks, and 
reschedules lost work.
+  void commitFinishedItems(
+      Instant fireTimestamp,
+      BagState<KV<K, InputT>> toProcessState,
+      Timer timer,
+      OutputReceiver<OutputT> receiver) {
+
+    Iterable<KV<K, InputT>> toProcessLocal = toProcessState.read();
+    if (toProcessLocal == null || !toProcessLocal.iterator().hasNext()) {
+      // Early Exit: if BagState is empty, we skip checking activeElements for 
this key.
+      return;
+    }
+
+    // Since fireTimestamp is key-scoped, we determine the current key from 
the first element in
+    // state
+    List<KV<K, InputT>> stateList = new ArrayList<>();
+    K key = null;
+    for (KV<K, InputT> element : toProcessLocal) {
+      stateList.add(element);
+      if (key == null) {
+        key = element.getKey();
+      }
+    }
+
+    if (verboseLogging) {
+      LOG.info("processing timer for key: {}", key);
+    }
+
+    ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = 
getProcessingElements();
+
+    List<List<TimestampedOutput<OutputT>>> toReturn = new ArrayList<>();
+    Set<KV<K, InputT>> finishedItems = new HashSet<>();
+    List<KV<K, InputT>> toReschedule = new ArrayList<>();
+
+    int itemsFinished = 0;
+    int itemsNotYetFinished = 0;
+    int itemsRescheduled = 0;
+
+    Set<Object> finishedElementIds = new HashSet<>();
+    Set<Object> inFlightElementIds = new HashSet<>();
+    Set<Object> rescheduledElementIds = new HashSet<>();
+
+    lock.lock();
+    try {
+      Set<Object> stateElementIds = new HashSet<>();
+      for (KV<K, InputT> element : stateList) {
+        stateElementIds.add(idFn.apply(element.getValue()));
+      }
+
+      List<Object> toCancelIds = new ArrayList<>();
+      for (Map.Entry<Object, InFlightElement<OutputT>> entry : 
activeElements.entrySet()) {
+        InFlightElement<OutputT> inFlight = entry.getValue();
+        if (java.util.Objects.equals(inFlight.key, key)
+            && !stateElementIds.contains(entry.getKey())) {
+          toCancelIds.add(entry.getKey());
+        }
+      }
+
+      for (Object cancelId : toCancelIds) {
+        InFlightElement<OutputT> inFlight = activeElements.get(cancelId);
+        if (inFlight != null) {
+          inFlight.future.cancel(true);
+          activeElements.remove(cancelId);
+        }
+      }
+
+      for (KV<K, InputT> element : stateList) {
+        Object elementId = idFn.apply(element.getValue());
+
+        // Skip processing if we already completed, rescheduled, or found this 
elementId active in
+        // this cycle
+        if (finishedElementIds.contains(elementId)
+            || rescheduledElementIds.contains(elementId)
+            || inFlightElementIds.contains(elementId)) {
+          continue;
+        }
+
+        if (activeElements.containsKey(elementId)) {
+          InFlightElement<OutputT> inFlight = activeElements.get(elementId);
+          if (inFlight.future.isDone()) {
+            try {
+              if (!inFlight.future.isCancelled()) {
+                toReturn.add(inFlight.future.get());
+              }
+              finishedItems.add(element);
+              finishedElementIds.add(elementId);
+              activeElements.remove(elementId);
+              itemsFinished++;
+            } catch (Exception e) {
+              LOG.error("Error executing async task for element {}", element, 
e);
+              throw new RuntimeException("Error executing async task for 
element " + element, e);
+            }
+          } else {
+            inFlightElementIds.add(elementId);
+            itemsNotYetFinished++;
+          }
+        } else {
+          LOG.info(
+              "Item {} found in state but not in local active elements, 
scheduling now", element);
+          toReschedule.add(element);
+          rescheduledElementIds.add(elementId);
+          itemsRescheduled++;
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+
+    // Reschedule missing elements
+    for (KV<K, InputT> element : toReschedule) {
+      scheduleItem(element, GlobalWindow.INSTANCE, fireTimestamp);
+    }
+
+    // Update State: keep only unfinished items
+    toProcessState.clear();
+    int itemsInProcessingState = 0;
+    for (KV<K, InputT> element : stateList) {
+      if (!finishedItems.contains(element)) {
+        toProcessState.add(element);
+        itemsInProcessingState++;
+      }
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   In `commitFinishedItems`, `toProcessState` is updated by checking 
`!finishedItems.contains(element)`. Since `finishedItems` is a `Set<KV<K, 
InputT>>`, this relies on `InputT` correctly implementing `equals` and 
`hashCode`. If `InputT` is a byte array or a complex object that does not 
override `equals`, this check will fail, leading to duplicate processing or 
state corruption.
   
   We should instead use `finishedElementIds.contains(elementId)` (using the 
user-provided `idFn` which is guaranteed to uniquely identify elements) to 
filter out finished elements.
   
   ```java
       for (KV<K, InputT> element : stateList) {      Object elementId = 
idFn.apply(element.getValue());      if 
(!finishedElementIds.contains(elementId)) {        toProcessState.add(element); 
       itemsInProcessingState++;      }    }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to