gemini-code-assist[bot] commented on code in PR #38609: URL: https://github.com/apache/beam/pull/38609#discussion_r3321007943
########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + * + * <p>/* NOTE: 1) The wrapped syncFn requires thread-safety ONLY if BOTH parallelism > 1 AND the + * DoFn is stateful (keeps instance state). 2) Tagged output multi-outputs are unsupported. 3) + * StartBundle/finishBundle are invoked per element so any batching or aggregation logic will not + * behave as expected. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient volatile @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> + processingElements = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> refCounts = + new ConcurrentHashMap<>(); + + private static final ReentrantLock lock = new ReentrantLock(); Review Comment:  The 'lock' is defined as a static member: 'private static final ReentrantLock lock = new ReentrantLock();'. Because this lock is static, **all** instances of 'AsyncDoFn' across the entire JVM share the same lock. Every element processed by any 'AsyncDoFn' step in the pipeline will contend for this single JVM-wide lock during 'scheduleIfRoom' and 'commitFinishedItems'. This introduces a massive performance bottleneck and defeats the purpose of asynchronous, high-throughput processing.\n\nInstead of a single static lock, use a lock per 'uuid' (e.g., stored in a static 'ConcurrentHashMap<String, ReentrantLock>') or make the lock instance-scoped if 'uuid' isolation is sufficient. ```java private static final ConcurrentHashMap<String, ReentrantLock> locks = new ConcurrentHashMap<>();\n\n private ReentrantLock getLock() {\n return locks.computeIfAbsent(uuid, k -> new ReentrantLock());\n } ``` ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + * + * <p>/* NOTE: 1) The wrapped syncFn requires thread-safety ONLY if BOTH parallelism > 1 AND the + * DoFn is stateful (keeps instance state). 2) Tagged output multi-outputs are unsupported. 3) + * StartBundle/finishBundle are invoked per element so any batching or aggregation logic will not + * behave as expected. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient volatile @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> + processingElements = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> refCounts = + new ConcurrentHashMap<>(); + + private static final ReentrantLock lock = new ReentrantLock(); + private static final boolean verboseLogging = false; + + private static class TimestampedOutput<T> { + final T value; + final @Nullable Instant timestamp; + + TimestampedOutput(T value, @Nullable Instant timestamp) { + this.value = value; + this.timestamp = timestamp; + } + } + + private static class InFlightElement<OutputT> { + final CompletableFuture<List<TimestampedOutput<OutputT>>> future; + + InFlightElement(CompletableFuture<List<TimestampedOutput<OutputT>>> future) { + this.future = future; + } + } + + // The In-Memory Accumulating Receiver + // Accumulates elements in-memory during asynchronous background worker execution. + // Buffered elements are only committed downstream once the parent task completes successfully + // and the timer fires. + private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { + private final List<TimestampedOutput<T>> outputs = + Collections.synchronizedList(new ArrayList<>()); + + @Override + public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { + return org.apache.beam.sdk.values.WindowedValues.<T>builder() + .setValue(value) + .setTimestamp(Instant.now()) + .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) + .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> + outputs.add( + new TimestampedOutput<>( + windowedValue.getValue(), windowedValue.getTimestamp()))); + } + + // Bypasses the nested anonymous OutputBuilder instantiation for standard outputs. + // JVM optimization to prevent garbage collection pressure under high pipeline throughput. + @Override + public void output(T output) { + outputs.add(new TimestampedOutput<>(output, null)); + } + + @Override + public void outputWithTimestamp(T output, Instant timestamp) { + outputs.add(new TimestampedOutput<>(output, timestamp)); + } + + public List<T> getOutputs() { + List<T> rawOutputs = new ArrayList<>(); + for (TimestampedOutput<T> out : outputs) { + rawOutputs.add(out.value); + } + return rawOutputs; + } + + public List<TimestampedOutput<T>> getTimestampedOutputs() { + return outputs; + } + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool) { + this( + syncFn, + parallelism, + timerFrequency, + maxItemsToBuffer, + timeout, + maxWaitTime, + idFn, + useThreadPool, + null); + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool, + @Nullable Coder<KV<K, InputT>> coder) { + this.syncFn = syncFn; + this.parallelism = parallelism; + if (timerFrequency.getMillis() <= 0) { + throw new IllegalArgumentException("timerFrequency must be greater than zero"); + } + this.timerFrequency = timerFrequency; + this.maxItemsToBuffer = + (maxItemsToBuffer != null) + ? maxItemsToBuffer + : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY); + this.timeout = (timeout != null) ? timeout : Duration.standardSeconds(DEFAULT_TIMEOUT_SEC); + this.maxWaitTime = + (maxWaitTime != null) ? maxWaitTime : Duration.millis(DEFAULT_MAX_WAIT_TIME_MS); + this.idFn = + (idFn != null) + ? idFn + : (SerializableFunction<InputT, Object>) + input -> java.util.Objects.requireNonNull(input); + this.useThreadPool = useThreadPool; + this.uuid = UUID.randomUUID().toString(); + this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : StateSpecs.bag(); + } + + private ExecutorService getThreadPool() { + ExecutorService threadPool = pool.get(uuid); + if (threadPool == null) { + throw new IllegalStateException("Thread pool not initialized for UUID: " + uuid); + } + return threadPool; + } + + @SuppressWarnings("unchecked") + private ConcurrentHashMap<Object, InFlightElement<OutputT>> getProcessingElements() { + ConcurrentHashMap<Object, InFlightElement<?>> elements = processingElements.get(uuid); + if (elements == null) { + throw new IllegalStateException("Processing elements map not initialized for UUID: " + uuid); + } + return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) (ConcurrentHashMap<?, ?>) elements; + } + + private AtomicInteger getItemsInBuffer() { + AtomicInteger buffer = itemsInBuffer.get(uuid); + if (buffer == null) { + throw new IllegalStateException("Buffer counter not initialized for UUID: " + uuid); + } + return buffer; + } + + @Setup + public void setup(PipelineOptions options) { + this.pipelineOptions = options; + + // Setup the wrapped DoFn + DoFnInvokers.invokerFor(syncFn) + .invokeSetup( + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Setup"; + } + }); + + if (useThreadPool) { + LOG.info("Using thread pool for asynchronous execution with parallelism {}", parallelism); + } + + lock.lock(); + try { + pool.computeIfAbsent(uuid, k -> Executors.newFixedThreadPool(parallelism)); + processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>()); + itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0)); + refCounts.computeIfAbsent(uuid, k -> new AtomicInteger(0)).incrementAndGet(); + } finally { + lock.unlock(); + } + } + + // Clean up JVM-wide shared resources to prevent thread leaks on the worker + @Teardown + public void teardown() { + DoFnInvokers.invokerFor(syncFn).invokeTeardown(); + ExecutorService threadPool = null; + lock.lock(); + try { + AtomicInteger refCount = refCounts.get(uuid); + if (refCount != null && refCount.decrementAndGet() == 0) { + refCounts.remove(uuid); + threadPool = pool.remove(uuid); + processingElements.remove(uuid); + itemsInBuffer.remove(uuid); + } + } finally { + lock.unlock(); + } + if (threadPool != null) { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + } catch (InterruptedException e) { + threadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + // Asynchronous Scheduling & Deduplication + // Submits tasks to the background thread pool. If an element with the same ID is already + // in-flight, + // the submission is silently ignored to enforce exactly-once semantics. + private boolean scheduleIfRoom( + KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean ignoreBuffer) { + lock.lock(); + try { + ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = getProcessingElements(); + Object elementId = idFn.apply(element.getValue()); + + if (activeElements.containsKey(elementId)) { + LOG.info("Item {} already in processing elements", element); + return true; + } + + int currentBuffer = getItemsInBuffer().get(); + if (currentBuffer < maxItemsToBuffer || ignoreBuffer) { + java.util.concurrent.Executor executor = + useThreadPool ? getThreadPool() : java.util.concurrent.ForkJoinPool.commonPool(); + + // Pending asynchronous task that will produce a list of outputs + CompletableFuture<List<TimestampedOutput<OutputT>>> future = + CompletableFuture.supplyAsync( + () -> { + try { + AccumulatingOutputReceiver<OutputT> receiver = + new AccumulatingOutputReceiver<>(); + DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.invokerFor(syncFn); + + DoFnInvoker.ArgumentProvider<InputT, OutputT> bundleArgProvider = + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + PipelineOptions options = pipelineOptions; + if (options == null) { + throw new IllegalStateException("PipelineOptions not set"); + } + return options; + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + return doFn.new FinishBundleContext() { + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions(); + } + + @Override + public void output( + OutputT output, Instant timestamp, BoundedWindow window) { + receiver.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void output( + TupleTag<T> tag, + T output, + Instant timestamp, + BoundedWindow window) { + throw new UnsupportedOperationException( + "Tagged output not supported in " + + "FinishBundleContext for AsyncDoFn"); + } + }; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Bundle"; + } + }; + + invoker.invokeStartBundle(bundleArgProvider); + + DoFnInvoker.ArgumentProvider<InputT, OutputT> processArgProvider = + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public InputT element(DoFn<InputT, OutputT> doFn) { + return element.getValue(); + } + + @Override + public OutputReceiver<OutputT> outputReceiver( + DoFn<InputT, OutputT> doFn) { + return receiver; + } + + @Override + public BoundedWindow window() { + return window; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public PipelineOptions pipelineOptions() { + PipelineOptions options = pipelineOptions; + if (options == null) { + throw new IllegalStateException("PipelineOptions not set"); + } + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Process"; + } + }; + + invoker.invokeProcessElement(processArgProvider); + invoker.invokeFinishBundle(bundleArgProvider); + + return receiver.getTimestampedOutputs(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, + executor); + + // Assigned to 'unused' to satisfy ErrorProne while preserving parent future for + // cancellation + CompletableFuture<List<TimestampedOutput<OutputT>>> unused = + future.whenComplete( + (res, ex) -> { + lock.lock(); + try { + getItemsInBuffer().decrementAndGet(); + } finally { + lock.unlock(); + } + }); + + activeElements.put(elementId, new InFlightElement<>(future)); + getItemsInBuffer().incrementAndGet(); + return true; + } + + return false; + } finally { + lock.unlock(); + } + } + + private void scheduleItem(KV<K, InputT> element, BoundedWindow window, Instant timestamp) { + boolean done = false; + long sleepTime = INITIAL_BACKOFF_SLEEP_MS; + long totalSleep = 0; + long timeoutMs = timeout.getMillis(); + + while (!done && totalSleep < timeoutMs) { + done = scheduleIfRoom(element, window, timestamp, false); + if (!done) { + long sleep = Math.min(maxWaitTime.getMillis(), sleepTime); + if (verboseLogging || totalSleep > BACKPRESSURE_LOG_THRESHOLD_MS) { + LOG.info( + "buffer is full for item {}, {} waiting {} ms. Have waited for {} ms.", + element, + getItemsInBuffer().get(), + sleep, + totalSleep); + } + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for space in buffer", e); + } + + // Prevents long overflow possibility + if (sleepTime < maxWaitTime.getMillis()) { + sleepTime *= 2; + } + + totalSleep += sleep; + } + } + // Timeout: element skips JVM pool but stays in BagState for timer to reschedule later. + } + + private Instant nextTimeToFire(@Nullable K key) { + long seed = (key == null) ? 0 : key.hashCode(); + double fractionalOffset = Math.abs(seed % 1000000) / 1000000.0; + double timerFrequencySec = timerFrequency.getMillis() / 1000.0; + double nowSec = System.currentTimeMillis() / 1000.0; + + double base = Math.floor((nowSec + timerFrequencySec) / timerFrequencySec) * timerFrequencySec; + double offset = fractionalOffset * timerFrequencySec; + + return Instant.ofEpochMilli((long) ((base + offset) * 1000)); + } + + @ProcessElement + public void processElement( + ProcessContext c, + BoundedWindow window, + @StateId("to_process") BagState<KV<K, InputT>> toProcessState, + @TimerId("timer") Timer timer) { + + KV<K, InputT> element = c.element(); + scheduleItem(element, window, c.timestamp()); + toProcessState.add(element); + + Instant timeToFire = nextTimeToFire(element.getKey()); + timer.set(timeToFire); + } + + @OnTimer("timer") + public void onTimer( + OnTimerContext c, + @StateId("to_process") BagState<KV<K, InputT>> toProcessState, + @TimerId("timer") Timer timer, + OutputReceiver<OutputT> receiver) { + + commitFinishedItems(c.fireTimestamp(), toProcessState, timer, receiver); + } + + // Synchronizes local task results with the runner's persistent state container. + // Emits successfully completed elements, cancels rolled-back tasks, and reschedules lost work. + void commitFinishedItems( + Instant fireTimestamp, + BagState<KV<K, InputT>> toProcessState, + Timer timer, + OutputReceiver<OutputT> receiver) { + + Iterable<KV<K, InputT>> toProcessLocal = toProcessState.read(); + if (toProcessLocal == null || !toProcessLocal.iterator().hasNext()) { + // Early Exit: if BagState is empty, we skip checking activeElements for this key. + return; + } + + // Since fireTimestamp is key-scoped, we determine the current key from the first element in + // state + List<KV<K, InputT>> stateList = new ArrayList<>(); + K key = null; + for (KV<K, InputT> element : toProcessLocal) { + stateList.add(element); + if (key == null) { + key = element.getKey(); + } + } + + if (verboseLogging) { + LOG.info("processing timer for key: {}", key); + } + + ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = getProcessingElements(); + + List<List<TimestampedOutput<OutputT>>> toReturn = new ArrayList<>(); + Set<KV<K, InputT>> finishedItems = new HashSet<>(); + List<KV<K, InputT>> toReschedule = new ArrayList<>(); + + int itemsFinished = 0; + int itemsNotYetFinished = 0; + int itemsRescheduled = 0; + + Set<Object> finishedElementIds = new HashSet<>(); + Set<Object> inFlightElementIds = new HashSet<>(); + Set<Object> rescheduledElementIds = new HashSet<>(); + + lock.lock(); + try { + for (KV<K, InputT> element : stateList) { + Object elementId = idFn.apply(element.getValue()); + + // Skip processing if we already completed, rescheduled, or found this elementId active in + // this cycle + if (finishedElementIds.contains(elementId) + || rescheduledElementIds.contains(elementId) + || inFlightElementIds.contains(elementId)) { + continue; + } + + if (activeElements.containsKey(elementId)) { + InFlightElement<OutputT> inFlight = activeElements.get(elementId); + if (inFlight.future.isDone()) { + try { + if (!inFlight.future.isCancelled()) { + toReturn.add(inFlight.future.get()); + } + finishedItems.add(element); + finishedElementIds.add(elementId); + activeElements.remove(elementId); + itemsFinished++; + } catch (Exception e) { + LOG.error("Error executing async task for element {}", element, e); + throw new RuntimeException("Error executing async task for element " + element, e); + } + } else { + inFlightElementIds.add(elementId); + itemsNotYetFinished++; + } + } else { + LOG.info( + "Item {} found in state but not in local active elements, scheduling now", element); + toReschedule.add(element); + rescheduledElementIds.add(elementId); + itemsRescheduled++; + } + } + } finally { + lock.unlock(); + } + + // Reschedule missing elements + for (KV<K, InputT> element : toReschedule) { + scheduleItem(element, GlobalWindow.INSTANCE, fireTimestamp); + } Review Comment:  In 'commitFinishedItems' (which runs on the runner's timer execution thread), rescheduled elements are processed by calling 'scheduleItem'. 'scheduleItem' contains a blocking loop with 'Thread.sleep' that waits until there is room in the buffer. Blocking the runner's timer thread is extremely dangerous; it can freeze the entire pipeline, delay watermarks, and cause SDK harness timeouts/crashes.\n\nInstead of blocking, you should attempt to schedule the element once using 'scheduleIfRoom'. If there is no room, do **not** block or sleep. Simply leave the element in 'toProcessState' (by not adding it to 'finishedItems'), and let the next timer firing retry the scheduling. ```suggestion // Reschedule missing elements without blocking the timer thread\n for (KV<K, InputT> element : toReschedule) {\n scheduleIfRoom(element, GlobalWindow.INSTANCE, fireTimestamp, false);\n } ``` ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + * + * <p>/* NOTE: 1) The wrapped syncFn requires thread-safety ONLY if BOTH parallelism > 1 AND the + * DoFn is stateful (keeps instance state). 2) Tagged output multi-outputs are unsupported. 3) + * StartBundle/finishBundle are invoked per element so any batching or aggregation logic will not + * behave as expected. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient volatile @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> + processingElements = new ConcurrentHashMap<>(); Review Comment:  The 'processingElements' map is a static 'ConcurrentHashMap' that holds active and completed futures. If a key or window is garbage collected or abandoned by the runner (e.g., due to window expiration), the timer for that key might never fire. Consequently, the corresponding elements will never be removed from 'processingElements', leading to a progressive memory leak and eventual 'OutOfMemoryError' in long-running pipelines.\n\nTo prevent this, use a Guava 'Cache' with an expiration policy (e.g., 'expireAfterWrite' of 1 hour) instead of a raw 'ConcurrentHashMap' for 'processingElements'. This ensures that abandoned or leaked futures are eventually evicted. ```java private static final ConcurrentHashMap<String, com.google.common.cache.Cache<Object, InFlightElement<?>>>\n processingElements = new ConcurrentHashMap<>(); ``` ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + * + * <p>/* NOTE: 1) The wrapped syncFn requires thread-safety ONLY if BOTH parallelism > 1 AND the + * DoFn is stateful (keeps instance state). 2) Tagged output multi-outputs are unsupported. 3) + * StartBundle/finishBundle are invoked per element so any batching or aggregation logic will not + * behave as expected. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient volatile @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> + processingElements = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> refCounts = + new ConcurrentHashMap<>(); + + private static final ReentrantLock lock = new ReentrantLock(); + private static final boolean verboseLogging = false; + + private static class TimestampedOutput<T> { + final T value; + final @Nullable Instant timestamp; + + TimestampedOutput(T value, @Nullable Instant timestamp) { + this.value = value; + this.timestamp = timestamp; + } + } + + private static class InFlightElement<OutputT> { + final CompletableFuture<List<TimestampedOutput<OutputT>>> future; + + InFlightElement(CompletableFuture<List<TimestampedOutput<OutputT>>> future) { + this.future = future; + } + } + + // The In-Memory Accumulating Receiver + // Accumulates elements in-memory during asynchronous background worker execution. + // Buffered elements are only committed downstream once the parent task completes successfully + // and the timer fires. + private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { + private final List<TimestampedOutput<T>> outputs = + Collections.synchronizedList(new ArrayList<>()); + + @Override + public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { + return org.apache.beam.sdk.values.WindowedValues.<T>builder() + .setValue(value) + .setTimestamp(Instant.now()) + .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) + .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> + outputs.add( + new TimestampedOutput<>( + windowedValue.getValue(), windowedValue.getTimestamp()))); + } + + // Bypasses the nested anonymous OutputBuilder instantiation for standard outputs. + // JVM optimization to prevent garbage collection pressure under high pipeline throughput. + @Override + public void output(T output) { + outputs.add(new TimestampedOutput<>(output, null)); + } + + @Override + public void outputWithTimestamp(T output, Instant timestamp) { + outputs.add(new TimestampedOutput<>(output, timestamp)); + } + + public List<T> getOutputs() { + List<T> rawOutputs = new ArrayList<>(); + for (TimestampedOutput<T> out : outputs) { + rawOutputs.add(out.value); + } + return rawOutputs; + } + + public List<TimestampedOutput<T>> getTimestampedOutputs() { + return outputs; + } + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool) { + this( + syncFn, + parallelism, + timerFrequency, + maxItemsToBuffer, + timeout, + maxWaitTime, + idFn, + useThreadPool, + null); + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool, + @Nullable Coder<KV<K, InputT>> coder) { + this.syncFn = syncFn; + this.parallelism = parallelism; + if (timerFrequency.getMillis() <= 0) { + throw new IllegalArgumentException("timerFrequency must be greater than zero"); + } + this.timerFrequency = timerFrequency; + this.maxItemsToBuffer = + (maxItemsToBuffer != null) + ? maxItemsToBuffer + : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY); + this.timeout = (timeout != null) ? timeout : Duration.standardSeconds(DEFAULT_TIMEOUT_SEC); + this.maxWaitTime = + (maxWaitTime != null) ? maxWaitTime : Duration.millis(DEFAULT_MAX_WAIT_TIME_MS); + this.idFn = + (idFn != null) + ? idFn + : (SerializableFunction<InputT, Object>) + input -> java.util.Objects.requireNonNull(input); + this.useThreadPool = useThreadPool; + this.uuid = UUID.randomUUID().toString(); + this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : StateSpecs.bag(); + } + + private ExecutorService getThreadPool() { + ExecutorService threadPool = pool.get(uuid); + if (threadPool == null) { + throw new IllegalStateException("Thread pool not initialized for UUID: " + uuid); + } + return threadPool; + } + + @SuppressWarnings("unchecked") + private ConcurrentHashMap<Object, InFlightElement<OutputT>> getProcessingElements() { + ConcurrentHashMap<Object, InFlightElement<?>> elements = processingElements.get(uuid); + if (elements == null) { + throw new IllegalStateException("Processing elements map not initialized for UUID: " + uuid); + } + return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) (ConcurrentHashMap<?, ?>) elements; + } + + private AtomicInteger getItemsInBuffer() { + AtomicInteger buffer = itemsInBuffer.get(uuid); + if (buffer == null) { + throw new IllegalStateException("Buffer counter not initialized for UUID: " + uuid); + } + return buffer; + } + + @Setup + public void setup(PipelineOptions options) { + this.pipelineOptions = options; + + // Setup the wrapped DoFn + DoFnInvokers.invokerFor(syncFn) + .invokeSetup( + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Setup"; + } + }); + + if (useThreadPool) { + LOG.info("Using thread pool for asynchronous execution with parallelism {}", parallelism); + } + + lock.lock(); + try { + pool.computeIfAbsent(uuid, k -> Executors.newFixedThreadPool(parallelism)); + processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>()); + itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0)); + refCounts.computeIfAbsent(uuid, k -> new AtomicInteger(0)).incrementAndGet(); + } finally { + lock.unlock(); + } + } + + // Clean up JVM-wide shared resources to prevent thread leaks on the worker + @Teardown + public void teardown() { + DoFnInvokers.invokerFor(syncFn).invokeTeardown(); + ExecutorService threadPool = null; + lock.lock(); + try { + AtomicInteger refCount = refCounts.get(uuid); + if (refCount != null && refCount.decrementAndGet() == 0) { + refCounts.remove(uuid); + threadPool = pool.remove(uuid); + processingElements.remove(uuid); + itemsInBuffer.remove(uuid); + } + } finally { + lock.unlock(); + } + if (threadPool != null) { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + } catch (InterruptedException e) { + threadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + // Asynchronous Scheduling & Deduplication + // Submits tasks to the background thread pool. If an element with the same ID is already + // in-flight, + // the submission is silently ignored to enforce exactly-once semantics. + private boolean scheduleIfRoom( + KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean ignoreBuffer) { + lock.lock(); + try { + ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = getProcessingElements(); + Object elementId = idFn.apply(element.getValue()); + + if (activeElements.containsKey(elementId)) { + LOG.info("Item {} already in processing elements", element); + return true; + } + + int currentBuffer = getItemsInBuffer().get(); + if (currentBuffer < maxItemsToBuffer || ignoreBuffer) { + java.util.concurrent.Executor executor = + useThreadPool ? getThreadPool() : java.util.concurrent.ForkJoinPool.commonPool(); + + // Pending asynchronous task that will produce a list of outputs + CompletableFuture<List<TimestampedOutput<OutputT>>> future = + CompletableFuture.supplyAsync( + () -> { + try { + AccumulatingOutputReceiver<OutputT> receiver = + new AccumulatingOutputReceiver<>(); + DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.invokerFor(syncFn); + + DoFnInvoker.ArgumentProvider<InputT, OutputT> bundleArgProvider = + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + PipelineOptions options = pipelineOptions; + if (options == null) { + throw new IllegalStateException("PipelineOptions not set"); + } + return options; + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + return doFn.new FinishBundleContext() { + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions(); + } + + @Override + public void output( + OutputT output, Instant timestamp, BoundedWindow window) { + receiver.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void output( + TupleTag<T> tag, + T output, + Instant timestamp, + BoundedWindow window) { + throw new UnsupportedOperationException( + "Tagged output not supported in " + + "FinishBundleContext for AsyncDoFn"); + } + }; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Bundle"; + } + }; + + invoker.invokeStartBundle(bundleArgProvider); + + DoFnInvoker.ArgumentProvider<InputT, OutputT> processArgProvider = + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public InputT element(DoFn<InputT, OutputT> doFn) { + return element.getValue(); + } + + @Override + public OutputReceiver<OutputT> outputReceiver( + DoFn<InputT, OutputT> doFn) { + return receiver; + } + + @Override + public BoundedWindow window() { + return window; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public PipelineOptions pipelineOptions() { + PipelineOptions options = pipelineOptions; + if (options == null) { + throw new IllegalStateException("PipelineOptions not set"); + } + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Process"; + } + }; + + invoker.invokeProcessElement(processArgProvider); + invoker.invokeFinishBundle(bundleArgProvider); + + return receiver.getTimestampedOutputs(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, + executor); + + // Assigned to 'unused' to satisfy ErrorProne while preserving parent future for + // cancellation + CompletableFuture<List<TimestampedOutput<OutputT>>> unused = + future.whenComplete( + (res, ex) -> { + lock.lock(); + try { + getItemsInBuffer().decrementAndGet(); + } finally { + lock.unlock(); + } + }); + + activeElements.put(elementId, new InFlightElement<>(future)); + getItemsInBuffer().incrementAndGet(); + return true; + } + + return false; + } finally { + lock.unlock(); + } + } + + private void scheduleItem(KV<K, InputT> element, BoundedWindow window, Instant timestamp) { + boolean done = false; + long sleepTime = INITIAL_BACKOFF_SLEEP_MS; + long totalSleep = 0; + long timeoutMs = timeout.getMillis(); + + while (!done && totalSleep < timeoutMs) { + done = scheduleIfRoom(element, window, timestamp, false); + if (!done) { + long sleep = Math.min(maxWaitTime.getMillis(), sleepTime); + if (verboseLogging || totalSleep > BACKPRESSURE_LOG_THRESHOLD_MS) { + LOG.info( + "buffer is full for item {}, {} waiting {} ms. Have waited for {} ms.", + element, + getItemsInBuffer().get(), + sleep, + totalSleep); + } + try { + Thread.sleep(sleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException("Interrupted while waiting for space in buffer", e); + } + + // Prevents long overflow possibility + if (sleepTime < maxWaitTime.getMillis()) { + sleepTime *= 2; + } + + totalSleep += sleep; + } + } + // Timeout: element skips JVM pool but stays in BagState for timer to reschedule later. + } + + private Instant nextTimeToFire(@Nullable K key) { + long seed = (key == null) ? 0 : key.hashCode(); + double fractionalOffset = Math.abs(seed % 1000000) / 1000000.0; + double timerFrequencySec = timerFrequency.getMillis() / 1000.0; + double nowSec = System.currentTimeMillis() / 1000.0; + + double base = Math.floor((nowSec + timerFrequencySec) / timerFrequencySec) * timerFrequencySec; + double offset = fractionalOffset * timerFrequencySec; + + return Instant.ofEpochMilli((long) ((base + offset) * 1000)); + } Review Comment:  The 'nextTimeToFire' method manually calculates the absolute firing time using 'System.currentTimeMillis()'. This makes the class non-deterministic and extremely difficult to test using Beam's mock clocks (like 'TestStream' or 'TestPipeline' with processing time).\n\nInstead of manual absolute time calculations, leverage Beam's idiomatic relative timer API: 'timer.offset(duration).setRelative()'. You can still calculate the jitter/stagger duration and apply it as an offset. ```java private Duration getTimerOffset(@Nullable K key) {\n long seed = (key == null) ? 0 : key.hashCode();\n double fractionalOffset = Math.abs(seed % 1000000) / 1000000.0;\n long jitterMillis = (long) (fractionalOffset * timerFrequency.getMillis());\n return timerFrequency.plus(Duration.millis(jitterMillis));\n } ``` ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + * + * <p>/* NOTE: 1) The wrapped syncFn requires thread-safety ONLY if BOTH parallelism > 1 AND the + * DoFn is stateful (keeps instance state). 2) Tagged output multi-outputs are unsupported. 3) + * StartBundle/finishBundle are invoked per element so any batching or aggregation logic will not + * behave as expected. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient volatile @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> + processingElements = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> refCounts = + new ConcurrentHashMap<>(); + + private static final ReentrantLock lock = new ReentrantLock(); + private static final boolean verboseLogging = false; + + private static class TimestampedOutput<T> { + final T value; + final @Nullable Instant timestamp; + + TimestampedOutput(T value, @Nullable Instant timestamp) { + this.value = value; + this.timestamp = timestamp; + } + } + + private static class InFlightElement<OutputT> { + final CompletableFuture<List<TimestampedOutput<OutputT>>> future; + + InFlightElement(CompletableFuture<List<TimestampedOutput<OutputT>>> future) { + this.future = future; + } + } + + // The In-Memory Accumulating Receiver + // Accumulates elements in-memory during asynchronous background worker execution. + // Buffered elements are only committed downstream once the parent task completes successfully + // and the timer fires. + private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { + private final List<TimestampedOutput<T>> outputs = + Collections.synchronizedList(new ArrayList<>()); + + @Override + public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { + return org.apache.beam.sdk.values.WindowedValues.<T>builder() + .setValue(value) + .setTimestamp(Instant.now()) + .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) + .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> + outputs.add( + new TimestampedOutput<>( + windowedValue.getValue(), windowedValue.getTimestamp()))); + } + + // Bypasses the nested anonymous OutputBuilder instantiation for standard outputs. + // JVM optimization to prevent garbage collection pressure under high pipeline throughput. + @Override + public void output(T output) { + outputs.add(new TimestampedOutput<>(output, null)); + } + + @Override + public void outputWithTimestamp(T output, Instant timestamp) { + outputs.add(new TimestampedOutput<>(output, timestamp)); + } + + public List<T> getOutputs() { + List<T> rawOutputs = new ArrayList<>(); + for (TimestampedOutput<T> out : outputs) { + rawOutputs.add(out.value); + } + return rawOutputs; + } + + public List<TimestampedOutput<T>> getTimestampedOutputs() { + return outputs; + } + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool) { + this( + syncFn, + parallelism, + timerFrequency, + maxItemsToBuffer, + timeout, + maxWaitTime, + idFn, + useThreadPool, + null); + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool, + @Nullable Coder<KV<K, InputT>> coder) { + this.syncFn = syncFn; + this.parallelism = parallelism; + if (timerFrequency.getMillis() <= 0) { + throw new IllegalArgumentException("timerFrequency must be greater than zero"); + } + this.timerFrequency = timerFrequency; + this.maxItemsToBuffer = + (maxItemsToBuffer != null) + ? maxItemsToBuffer + : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY); + this.timeout = (timeout != null) ? timeout : Duration.standardSeconds(DEFAULT_TIMEOUT_SEC); + this.maxWaitTime = + (maxWaitTime != null) ? maxWaitTime : Duration.millis(DEFAULT_MAX_WAIT_TIME_MS); + this.idFn = + (idFn != null) + ? idFn + : (SerializableFunction<InputT, Object>) + input -> java.util.Objects.requireNonNull(input); + this.useThreadPool = useThreadPool; + this.uuid = UUID.randomUUID().toString(); + this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : StateSpecs.bag(); + } + + private ExecutorService getThreadPool() { + ExecutorService threadPool = pool.get(uuid); + if (threadPool == null) { + throw new IllegalStateException("Thread pool not initialized for UUID: " + uuid); + } + return threadPool; + } + + @SuppressWarnings("unchecked") + private ConcurrentHashMap<Object, InFlightElement<OutputT>> getProcessingElements() { + ConcurrentHashMap<Object, InFlightElement<?>> elements = processingElements.get(uuid); + if (elements == null) { + throw new IllegalStateException("Processing elements map not initialized for UUID: " + uuid); + } + return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) (ConcurrentHashMap<?, ?>) elements; + } + + private AtomicInteger getItemsInBuffer() { + AtomicInteger buffer = itemsInBuffer.get(uuid); + if (buffer == null) { + throw new IllegalStateException("Buffer counter not initialized for UUID: " + uuid); + } + return buffer; + } + + @Setup + public void setup(PipelineOptions options) { + this.pipelineOptions = options; + + // Setup the wrapped DoFn + DoFnInvokers.invokerFor(syncFn) + .invokeSetup( + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Setup"; + } + }); + + if (useThreadPool) { + LOG.info("Using thread pool for asynchronous execution with parallelism {}", parallelism); + } + + lock.lock(); + try { + pool.computeIfAbsent(uuid, k -> Executors.newFixedThreadPool(parallelism)); + processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>()); + itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0)); + refCounts.computeIfAbsent(uuid, k -> new AtomicInteger(0)).incrementAndGet(); + } finally { + lock.unlock(); + } + } + + // Clean up JVM-wide shared resources to prevent thread leaks on the worker + @Teardown + public void teardown() { + DoFnInvokers.invokerFor(syncFn).invokeTeardown(); + ExecutorService threadPool = null; + lock.lock(); + try { + AtomicInteger refCount = refCounts.get(uuid); + if (refCount != null && refCount.decrementAndGet() == 0) { + refCounts.remove(uuid); + threadPool = pool.remove(uuid); + processingElements.remove(uuid); + itemsInBuffer.remove(uuid); + } + } finally { + lock.unlock(); + } + if (threadPool != null) { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + } catch (InterruptedException e) { + threadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + // Asynchronous Scheduling & Deduplication + // Submits tasks to the background thread pool. If an element with the same ID is already + // in-flight, + // the submission is silently ignored to enforce exactly-once semantics. + private boolean scheduleIfRoom( + KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean ignoreBuffer) { + lock.lock(); + try { + ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = getProcessingElements(); + Object elementId = idFn.apply(element.getValue()); + + if (activeElements.containsKey(elementId)) { + LOG.info("Item {} already in processing elements", element); + return true; + } + + int currentBuffer = getItemsInBuffer().get(); + if (currentBuffer < maxItemsToBuffer || ignoreBuffer) { + java.util.concurrent.Executor executor = + useThreadPool ? getThreadPool() : java.util.concurrent.ForkJoinPool.commonPool(); + + // Pending asynchronous task that will produce a list of outputs + CompletableFuture<List<TimestampedOutput<OutputT>>> future = + CompletableFuture.supplyAsync( + () -> { + try { + AccumulatingOutputReceiver<OutputT> receiver = + new AccumulatingOutputReceiver<>(); + DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.invokerFor(syncFn); + + DoFnInvoker.ArgumentProvider<InputT, OutputT> bundleArgProvider = + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + PipelineOptions options = pipelineOptions; + if (options == null) { + throw new IllegalStateException("PipelineOptions not set"); + } + return options; + } + + @Override + public DoFn<InputT, OutputT>.FinishBundleContext finishBundleContext( + DoFn<InputT, OutputT> doFn) { + return doFn.new FinishBundleContext() { + @Override + public PipelineOptions getPipelineOptions() { + return pipelineOptions(); + } + + @Override + public void output( + OutputT output, Instant timestamp, BoundedWindow window) { + receiver.outputWithTimestamp(output, timestamp); + } + + @Override + public <T> void output( + TupleTag<T> tag, + T output, + Instant timestamp, + BoundedWindow window) { + throw new UnsupportedOperationException( + "Tagged output not supported in " + + "FinishBundleContext for AsyncDoFn"); + } + }; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Bundle"; + } + }; + + invoker.invokeStartBundle(bundleArgProvider); + + DoFnInvoker.ArgumentProvider<InputT, OutputT> processArgProvider = + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public InputT element(DoFn<InputT, OutputT> doFn) { + return element.getValue(); + } + + @Override + public OutputReceiver<OutputT> outputReceiver( + DoFn<InputT, OutputT> doFn) { + return receiver; + } + + @Override + public BoundedWindow window() { + return window; + } + + @Override + public Instant timestamp(DoFn<InputT, OutputT> doFn) { + return timestamp; + } + + @Override + public PipelineOptions pipelineOptions() { + PipelineOptions options = pipelineOptions; + if (options == null) { + throw new IllegalStateException("PipelineOptions not set"); + } + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Process"; + } + }; + + invoker.invokeProcessElement(processArgProvider); + invoker.invokeFinishBundle(bundleArgProvider); + + return receiver.getTimestampedOutputs(); + } catch (Exception e) { + throw new CompletionException(e); + } + }, + executor); + + // Assigned to 'unused' to satisfy ErrorProne while preserving parent future for + // cancellation + CompletableFuture<List<TimestampedOutput<OutputT>>> unused = + future.whenComplete( + (res, ex) -> { + lock.lock(); + try { + getItemsInBuffer().decrementAndGet(); + } finally { + lock.unlock(); + } + }); Review Comment:  In the 'whenComplete' callback of the future, the global static 'lock' is acquired solely to decrement 'itemsInBuffer'. Since 'getItemsInBuffer()' returns an 'AtomicInteger', the 'decrementAndGet()' operation is already completely thread-safe, atomic, and lock-free. Acquiring a heavy JVM-wide lock inside an asynchronous completion callback is unnecessary and introduces severe thread contention. ```java CompletableFuture<List<TimestampedOutput<OutputT>>> unused =\n future.whenComplete(\n (res, ex) -> {\n getItemsInBuffer().decrementAndGet();\n }); ``` ########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,715 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + * + * <p>/* NOTE: 1) The wrapped syncFn requires thread-safety ONLY if BOTH parallelism > 1 AND the + * DoFn is stateful (keeps instance state). 2) Tagged output multi-outputs are unsupported. 3) + * StartBundle/finishBundle are invoked per element so any batching or aggregation logic will not + * behave as expected. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient volatile @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap<String, ConcurrentHashMap<Object, InFlightElement<?>>> + processingElements = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> refCounts = + new ConcurrentHashMap<>(); + + private static final ReentrantLock lock = new ReentrantLock(); + private static final boolean verboseLogging = false; + + private static class TimestampedOutput<T> { + final T value; + final @Nullable Instant timestamp; + + TimestampedOutput(T value, @Nullable Instant timestamp) { + this.value = value; + this.timestamp = timestamp; + } + } + + private static class InFlightElement<OutputT> { + final CompletableFuture<List<TimestampedOutput<OutputT>>> future; + + InFlightElement(CompletableFuture<List<TimestampedOutput<OutputT>>> future) { + this.future = future; + } + } + + // The In-Memory Accumulating Receiver + // Accumulates elements in-memory during asynchronous background worker execution. + // Buffered elements are only committed downstream once the parent task completes successfully + // and the timer fires. + private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { + private final List<TimestampedOutput<T>> outputs = + Collections.synchronizedList(new ArrayList<>()); + + @Override + public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { + return org.apache.beam.sdk.values.WindowedValues.<T>builder() + .setValue(value) + .setTimestamp(Instant.now()) + .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) + .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) + .setReceiver( + windowedValue -> + outputs.add( + new TimestampedOutput<>( + windowedValue.getValue(), windowedValue.getTimestamp()))); + } + + // Bypasses the nested anonymous OutputBuilder instantiation for standard outputs. + // JVM optimization to prevent garbage collection pressure under high pipeline throughput. + @Override + public void output(T output) { + outputs.add(new TimestampedOutput<>(output, null)); + } + + @Override + public void outputWithTimestamp(T output, Instant timestamp) { + outputs.add(new TimestampedOutput<>(output, timestamp)); + } + + public List<T> getOutputs() { + List<T> rawOutputs = new ArrayList<>(); + for (TimestampedOutput<T> out : outputs) { + rawOutputs.add(out.value); + } + return rawOutputs; + } + + public List<TimestampedOutput<T>> getTimestampedOutputs() { + return outputs; + } + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool) { + this( + syncFn, + parallelism, + timerFrequency, + maxItemsToBuffer, + timeout, + maxWaitTime, + idFn, + useThreadPool, + null); + } + + public AsyncDoFn( + DoFn<InputT, OutputT> syncFn, + int parallelism, + Duration timerFrequency, + @Nullable Integer maxItemsToBuffer, + @Nullable Duration timeout, + @Nullable Duration maxWaitTime, + @Nullable SerializableFunction<InputT, Object> idFn, + boolean useThreadPool, + @Nullable Coder<KV<K, InputT>> coder) { + this.syncFn = syncFn; + this.parallelism = parallelism; + if (timerFrequency.getMillis() <= 0) { + throw new IllegalArgumentException("timerFrequency must be greater than zero"); + } + this.timerFrequency = timerFrequency; + this.maxItemsToBuffer = + (maxItemsToBuffer != null) + ? maxItemsToBuffer + : Math.max(parallelism * 2, DEFAULT_MIN_BUFFER_CAPACITY); + this.timeout = (timeout != null) ? timeout : Duration.standardSeconds(DEFAULT_TIMEOUT_SEC); + this.maxWaitTime = + (maxWaitTime != null) ? maxWaitTime : Duration.millis(DEFAULT_MAX_WAIT_TIME_MS); + this.idFn = + (idFn != null) + ? idFn + : (SerializableFunction<InputT, Object>) + input -> java.util.Objects.requireNonNull(input); + this.useThreadPool = useThreadPool; + this.uuid = UUID.randomUUID().toString(); + this.toProcessSpec = (coder != null) ? StateSpecs.bag(coder) : StateSpecs.bag(); + } + + private ExecutorService getThreadPool() { + ExecutorService threadPool = pool.get(uuid); + if (threadPool == null) { + throw new IllegalStateException("Thread pool not initialized for UUID: " + uuid); + } + return threadPool; + } + + @SuppressWarnings("unchecked") + private ConcurrentHashMap<Object, InFlightElement<OutputT>> getProcessingElements() { + ConcurrentHashMap<Object, InFlightElement<?>> elements = processingElements.get(uuid); + if (elements == null) { + throw new IllegalStateException("Processing elements map not initialized for UUID: " + uuid); + } + return (ConcurrentHashMap<Object, InFlightElement<OutputT>>) (ConcurrentHashMap<?, ?>) elements; + } + + private AtomicInteger getItemsInBuffer() { + AtomicInteger buffer = itemsInBuffer.get(uuid); + if (buffer == null) { + throw new IllegalStateException("Buffer counter not initialized for UUID: " + uuid); + } + return buffer; + } + + @Setup + public void setup(PipelineOptions options) { + this.pipelineOptions = options; + + // Setup the wrapped DoFn + DoFnInvokers.invokerFor(syncFn) + .invokeSetup( + new DoFnInvoker.BaseArgumentProvider<InputT, OutputT>() { + @Override + public PipelineOptions pipelineOptions() { + return options; + } + + @Override + public String getErrorContext() { + return "AsyncDoFn/Setup"; + } + }); + + if (useThreadPool) { + LOG.info("Using thread pool for asynchronous execution with parallelism {}", parallelism); + } + + lock.lock(); + try { + pool.computeIfAbsent(uuid, k -> Executors.newFixedThreadPool(parallelism)); + processingElements.computeIfAbsent(uuid, k -> new ConcurrentHashMap<>()); + itemsInBuffer.computeIfAbsent(uuid, k -> new AtomicInteger(0)); + refCounts.computeIfAbsent(uuid, k -> new AtomicInteger(0)).incrementAndGet(); + } finally { + lock.unlock(); + } + } + + // Clean up JVM-wide shared resources to prevent thread leaks on the worker + @Teardown + public void teardown() { + DoFnInvokers.invokerFor(syncFn).invokeTeardown(); + ExecutorService threadPool = null; + lock.lock(); + try { + AtomicInteger refCount = refCounts.get(uuid); + if (refCount != null && refCount.decrementAndGet() == 0) { + refCounts.remove(uuid); + threadPool = pool.remove(uuid); + processingElements.remove(uuid); + itemsInBuffer.remove(uuid); + } + } finally { + lock.unlock(); + } + if (threadPool != null) { + threadPool.shutdown(); + try { + if (!threadPool.awaitTermination(TEARDOWN_AWAIT_SEC, TimeUnit.SECONDS)) { + threadPool.shutdownNow(); + } + } catch (InterruptedException e) { + threadPool.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + } + + // Asynchronous Scheduling & Deduplication + // Submits tasks to the background thread pool. If an element with the same ID is already + // in-flight, + // the submission is silently ignored to enforce exactly-once semantics. + private boolean scheduleIfRoom( + KV<K, InputT> element, BoundedWindow window, Instant timestamp, boolean ignoreBuffer) { + lock.lock(); + try { + ConcurrentHashMap<Object, InFlightElement<OutputT>> activeElements = getProcessingElements(); + Object elementId = idFn.apply(element.getValue()); + + if (activeElements.containsKey(elementId)) { + LOG.info("Item {} already in processing elements", element); + return true; + } + + int currentBuffer = getItemsInBuffer().get(); + if (currentBuffer < maxItemsToBuffer || ignoreBuffer) { + java.util.concurrent.Executor executor = + useThreadPool ? getThreadPool() : java.util.concurrent.ForkJoinPool.commonPool(); + + // Pending asynchronous task that will produce a list of outputs + CompletableFuture<List<TimestampedOutput<OutputT>>> future = + CompletableFuture.supplyAsync( + () -> { + try { + AccumulatingOutputReceiver<OutputT> receiver = + new AccumulatingOutputReceiver<>(); + DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.invokerFor(syncFn); Review Comment:  The wrapped 'syncFn' is invoked concurrently across multiple background threads if 'parallelism > 1'. In Apache Beam, 'DoFn' instances are **not** thread-safe by default; the runner guarantees single-threaded access per instance. If the user's 'DoFn' has any instance variables or non-thread-safe state, concurrent execution will lead to race conditions and silent data corruption.\n\nTo ensure thread safety, you should clone the 'syncFn' per thread using a 'ThreadLocal' or a pool of deserialized instances. Since 'DoFn' is 'Serializable', you can serialize it once during 'setup' and deserialize it to create thread-local copies. ```suggestion // Use a ThreadLocal to ensure each thread has its own isolated clone of the syncFn\n private transient ThreadLocal<DoFn<InputT, OutputT>> threadLocalSyncFn;\n\n // Note: Initialize threadLocalSyncFn in setup() by serializing/deserializing syncFn\n DoFn<InputT, OutputT> localFn = threadLocalSyncFn.get();\n DoFnInvoker<InputT, OutputT> invoker = DoFnInvokers.invokerFor(localFn); ``` ########## sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/AsyncDoFnTest.java: ########## @@ -0,0 +1,945 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Tests for verifying async processing structures and logic. */ +@RunWith(JUnit4.class) +public class AsyncDoFnTest implements Serializable { + + @Rule public final transient TestPipeline p = TestPipeline.create(); Review Comment:  The 'TestPipeline p' rule is declared but never used in any of the unit tests. Since all tests execute the 'AsyncDoFn' methods directly via helper methods ('processDirect', 'commitFinishedItemsDirect'), this unused rule should be removed to keep the test suite clean. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
