gemini-code-assist[bot] commented on code in PR #38363:
URL: https://github.com/apache/beam/pull/38363#discussion_r3179004603
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -317,8 +320,31 @@ public <TagT> void output(TupleTag<TagT> tag,
WindowedValue<TagT> output) {
outputManager,
doFnSchemaInformation,
sideInputMapping);
+ if (hasStreamingSideInput) {
+ sideInputProcessor =
+ new StreamingSideInputProcessor<>(
+ new StreamingSideInputFetcher<InputT, W>(
+ fnInfo.getSideInputViews(),
+ fnInfo.getInputCoder(),
+ (WindowingStrategy<?, W>) fnInfo.getWindowingStrategy(),
+ (StreamingModeExecutionContext.StreamingModeStepContext)
userStepContext));
+ }
fnRunner.startBundle();
+ if (sideInputProcessor != null) {
+ boolean hasState = fnSignature != null &&
!fnSignature.stateDeclarations().isEmpty();
+ Iterable<WindowedValue<InputT>> unblockedElements =
sideInputProcessor.tryUnblockElements();
+ for (WindowedValue<InputT> unblockedElement : unblockedElements) {
+ fnRunner.processElement(unblockedElement);
Review Comment:

The `outputsPerElementTracker` is not notified when processing unblocked
elements in `startBundle`. This can lead to incorrect progress tracking and
metrics in Dataflow, as the tracker expects `onProcessElement()` to be called
before `processElement()`. You should wrap the `processElement` call with the
appropriate tracker notifications.
```java
for (WindowedValue<InputT> unblockedElement : unblockedElements) {
outputsPerElementTracker.onProcessElement();
fnRunner.processElement(unblockedElement);
outputsPerElementTracker.onProcessElementSuccess();
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+
+/** Helper class for handling elements blocked on side inputs. */
+class StreamingSideInputProcessor<InputT, W extends BoundedWindow> {
+ private final StreamingSideInputFetcher<InputT, W> sideInputFetcher;
+
+ public StreamingSideInputProcessor(StreamingSideInputFetcher<InputT, W>
sideInputFetcher) {
+ this.sideInputFetcher = sideInputFetcher;
+ }
+
+ /**
+ * Handle's startBundle. If there are unblocked elements, process them and
then return the set of
+ * windows that were unblocked.
+ */
+ Iterable<WindowedValue<InputT>> tryUnblockElements() {
+ sideInputFetcher.prefetchBlockedMap();
+
+ // Find the set of ready windows.
+ Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+
+ Iterable<BagState<WindowedValue<InputT>>> elementsBags =
+ sideInputFetcher.prefetchElements(readyWindows);
+ List<WindowedValue<InputT>> releaseElements = Lists.newArrayList();
+ for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
+ Iterable<WindowedValue<InputT>> elements = elementsBag.read();
+ for (WindowedValue<InputT> elem : elements) {
+ releaseElements.add(elem);
+ }
+ elementsBag.clear();
+ }
Review Comment:

Flattening all unblocked elements from all ready windows into a single
`ArrayList` poses a significant memory risk and could lead to
`OutOfMemoryError` if a large number of elements were buffered. The previous
implementation in `StreamingSideInputDoFnRunner` processed elements bag-by-bag,
which is much more memory-efficient. Consider refactoring this to process
elements lazily or bag-by-bag to avoid accumulating everything in memory at
once.
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
##########
@@ -3678,6 +3679,154 @@ public void processElement(
pipeline.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTestStream.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testTimerSideInput() {
+ // SideInput tag id
+ final String sideInputTag1 = "tag1";
+
+ final PCollectionView<Integer> sideInput =
+ pipeline
+ .apply("CreateSideInput1", Create.of(2))
+ .apply("ViewSideInput1", View.asSingleton());
+
+ DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+ new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+ @TimerId("timer")
+ private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("foo")
+ private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();
+
+ @ProcessElement
+ public void process(@Timestamp Instant ts, @TimerId("timer") Timer
timer) {
+ timer.align(Duration.standardSeconds(10)).setRelative();
+ }
+
+ @OnTimer("timer")
+ public void onTimer(
+ OutputReceiver<KV<Integer, Integer>> o,
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+
+ @OnWindowExpiration
+ public void onWindowExpiration(
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ OutputReceiver<KV<Integer, Integer>> o,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+ };
+
+ final int numTestElements = 10;
+ final Instant now = new Instant(0);
+ TestStream.Builder<KV<Integer, Integer>> builder =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0));
+
+ for (int i = 0; i < numTestElements; i++) {
+ builder =
+ builder.addElements(
+ TimestampedValue.of(KV.of(i % 2, i),
now.plus(Duration.millis(i * 1000))));
+ if ((i + 1) % 10 == 0) {
+ builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i +
1) * 1000)));
+ }
+ }
+ List<KV<Integer, Integer>> expected =
+ IntStream.rangeClosed(0, 1)
+ .boxed()
+ .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i,
2)).stream())
+ .collect(Collectors.toList());
+
+ PCollection<KV<Integer, Integer>> output =
+ pipeline
+ .apply(builder.advanceWatermarkToInfinity())
+ .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+ PAssert.that(output).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class
+ })
+ public void testSideInputNotReadyTimer() {
+ final String sideInputTag = "tag1";
+
+ // Create a side input that is delayed by 5 seconds using Thread.sleep
+ DoFn<KV<String, String>, String> delayFn =
+ new DoFn<KV<String, String>, String>() {
+ @ProcessElement
+ public void process(OutputReceiver<String> o) throws
InterruptedException {
+ Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());
Review Comment:

Using `Thread.sleep(15000)` makes the test suite significantly slower. For
simulating a delayed side input in these tests, a shorter duration (e.g., 5
seconds) is usually sufficient and helps maintain a faster CI pipeline.
```suggestion
Thread.sleep(java.time.Duration.ofSeconds(5).toMillis());
```
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -334,14 +360,28 @@ public void processElement(Object untypedElem) throws
Exception {
WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
- if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
+ boolean hasState = fnSignature != null &&
!fnSignature.stateDeclarations().isEmpty();
+ outputsPerElementTracker.onProcessElement();
+
+ Collection<W> windowsProcessed;
+ if (sideInputProcessor != null) {
+ windowsProcessed = Lists.newArrayList();
+ Iterable<WindowedValue<InputT>> elementsToProcess =
+ sideInputProcessor.handleProcessElement(elem);
+ for (WindowedValue<InputT> toProcess : elementsToProcess) {
+ fnRunner.processElement(toProcess);
+ windowsProcessed.addAll((Collection<W>) toProcess.getWindows());
Review Comment:

The `windowsProcessed` list is allocated and populated even if `hasState` is
false, in which case it is never used. Deferring the allocation and population
of this list until it's known that state cleanup is required would be more
efficient.
```suggestion
windowsProcessed = hasState ? Lists.newArrayList() :
ImmutableList.of();
Iterable<WindowedValue<InputT>> elementsToProcess =
sideInputProcessor.handleProcessElement(elem);
for (WindowedValue<InputT> toProcess : elementsToProcess) {
fnRunner.processElement(toProcess);
if (hasState) {
windowsProcessed.addAll((Collection<W>) toProcess.getWindows());
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]