gemini-code-assist[bot] commented on code in PR #38609: URL: https://github.com/apache/beam/pull/38609#discussion_r3291646007
########## sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/AsyncDoFn.java: ########## @@ -0,0 +1,689 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Random; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.state.BagState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; +import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that wraps a dofn and converts it from one which process elements synchronously to one + * which processes them asynchronously. + * + * <p>For synchronous dofns the default settings mean that many (100s) of elements will be processed + * in parallel and that processing an element will block all other work on that key. In addition + * runners are optimized for latencies less than a few seconds and longer operations can result in + * high retry rates. Async should be considered when the default parallelism is not correct and/or + * items are expected to take longer than a few seconds to process. + */ +public class AsyncDoFn<K, InputT, OutputT> extends DoFn<KV<K, InputT>, OutputT> { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncDoFn.class); + + private static final int DEFAULT_MIN_BUFFER_CAPACITY = 10; + private static final int DEFAULT_TIMEOUT_SEC = 1; + private static final int DEFAULT_MAX_WAIT_TIME_MS = 500; + private static final int TEARDOWN_AWAIT_SEC = 5; + private static final int INITIAL_BACKOFF_SLEEP_MS = 10; + private static final int BACKPRESSURE_LOG_THRESHOLD_MS = 10000; + + @StateId("to_process") + private final StateSpec<BagState<KV<K, InputT>>> toProcessSpec; + + @TimerId("timer") + private final TimerSpec timerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + private final DoFn<InputT, OutputT> syncFn; + private final int parallelism; + private final Duration timerFrequency; + private final int maxItemsToBuffer; + private final Duration timeout; + private final Duration maxWaitTime; + private final SerializableFunction<InputT, Object> idFn; + private final boolean useThreadPool; + private final String uuid; + + private transient @Nullable PipelineOptions pipelineOptions; + + // Shared JVM-Wide States (Static Registries) + // Map-backed registry holding shared resources across serialized worker instances. Since runners + // clone DoFn instances on the same worker node, static maps ensure safe JVM-wide resource reuse. + private static final ConcurrentHashMap<String, ExecutorService> pool = new ConcurrentHashMap<>(); + // activeElements (processingElements) is global JVM memory (all keys) + private static final ConcurrentHashMap< + String, ConcurrentHashMap<Object, InFlightElement<?, ?, ?>>> + processingElements = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap<String, AtomicInteger> itemsInBuffer = + new ConcurrentHashMap<>(); + + private static final ReentrantLock lock = new ReentrantLock(); + private static final boolean verboseLogging = false; + + private static class InFlightElement<K, InputT, OutputT> { + final KV<K, InputT> element; + final CompletableFuture<List<OutputT>> future; + + InFlightElement(KV<K, InputT> element, CompletableFuture<List<OutputT>> future) { + this.element = element; + this.future = future; + } + } + + // The In-Memory Accumulating Receiver + // Accumulates elements in-memory during asynchronous background worker execution. + // Buffered elements are only committed downstream once the parent task completes successfully + // and the timer fires. + private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { + private final List<T> outputs = Collections.synchronizedList(new ArrayList<>()); + + @Override + public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { + return org.apache.beam.sdk.values.WindowedValues.<T>builder() + .setValue(value) + .setTimestamp(Instant.now()) + .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) + .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) + .setReceiver(windowedValue -> outputs.add(windowedValue.getValue())); + } + + // Bypasses the nested anonymous OutputBuilder instantiation for standard outputs. + // JVM optimization to prevent garbage collection pressure under high pipeline throughput. + @Override + public void output(T output) { + outputs.add(output); + } + + @Override + public void outputWithTimestamp(T output, Instant timestamp) { + outputs.add(output); + } Review Comment: To address the issue of timestamp loss in `AccumulatingOutputReceiver`, you need to capture the timestamp provided by the `DoFn` during execution and propagate it when emitting the results. Since `OutputReceiver` does not natively support timestamps, you should store the output as a `TimestampedValue` container. However, please note that `AsyncDoFn` is a `DoFn` wrapper, and in the `onTimer` method, you only have access to `OnTimerContext`, which does not provide a `ProcessContext` to emit with a timestamp. If the downstream `DoFn` expects specific windowing or timestamp behavior, this limitation might require a more fundamental change to how `AsyncDoFn` handles output emission (e.g., by using a `MultiOutputReceiver` or ensuring the runner supports timestamped output in the timer context). If you are certain the downstream receiver supports it, you can cast the receiver to a type that supports timestamped output, but this is generally not recommended as it breaks the `OutputReceiver` abstraction. A safer approach is to ensure the `syncFn` itself handles the windowing/timestamping logic if possible, or to use a different mechanism to emit the results. ```suggestion private static class TimestampedValue<T> { final T value; final Instant timestamp; TimestampedValue(T value, Instant timestamp) { this.value = value; this.timestamp = timestamp; } } private static class AccumulatingOutputReceiver<T> implements OutputReceiver<T> { private final List<TimestampedValue<T>> outputs = Collections.synchronizedList(new ArrayList<>()); @Override public org.apache.beam.sdk.values.OutputBuilder<T> builder(T value) { return org.apache.beam.sdk.values.WindowedValues.<T>builder() .setValue(value) .setTimestamp(Instant.now()) .setWindows(java.util.Collections.singletonList(GlobalWindow.INSTANCE)) .setPaneInfo(org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING) .setReceiver(windowedValue -> outputs.add(new TimestampedValue<>(windowedValue.getValue(), windowedValue.getTimestamp()))); } @Override public void output(T output) { outputs.add(new TimestampedValue<>(output, Instant.now())); } @Override public void outputWithTimestamp(T output, Instant timestamp) { outputs.add(new TimestampedValue<>(output, timestamp)); } public List<TimestampedValue<T>> getOutputs() { return outputs; } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
