gemini-code-assist[bot] commented on code in PR #38363:
URL: https://github.com/apache/beam/pull/38363#discussion_r3179004603


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -317,8 +320,31 @@ public <TagT> void output(TupleTag<TagT> tag, 
WindowedValue<TagT> output) {
             outputManager,
             doFnSchemaInformation,
             sideInputMapping);
+    if (hasStreamingSideInput) {
+      sideInputProcessor =
+          new StreamingSideInputProcessor<>(
+              new StreamingSideInputFetcher<InputT, W>(
+                  fnInfo.getSideInputViews(),
+                  fnInfo.getInputCoder(),
+                  (WindowingStrategy<?, W>) fnInfo.getWindowingStrategy(),
+                  (StreamingModeExecutionContext.StreamingModeStepContext) 
userStepContext));
+    }
 
     fnRunner.startBundle();
+    if (sideInputProcessor != null) {
+      boolean hasState = fnSignature != null && 
!fnSignature.stateDeclarations().isEmpty();
+      Iterable<WindowedValue<InputT>> unblockedElements = 
sideInputProcessor.tryUnblockElements();
+      for (WindowedValue<InputT> unblockedElement : unblockedElements) {
+        fnRunner.processElement(unblockedElement);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `outputsPerElementTracker` is not notified when processing unblocked 
elements in `startBundle`. This can lead to incorrect progress tracking and 
metrics in Dataflow, as the tracker expects `onProcessElement()` to be called 
before `processElement()`. You should wrap the `processElement` call with the 
appropriate tracker notifications.
   
   ```java
         for (WindowedValue<InputT> unblockedElement : unblockedElements) {
           outputsPerElementTracker.onProcessElement();
           fnRunner.processElement(unblockedElement);
           outputsPerElementTracker.onProcessElementSuccess();
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputProcessor.java:
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.WindowedValue;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+
+/** Helper class for handling elements blocked on side inputs. */
+class StreamingSideInputProcessor<InputT, W extends BoundedWindow> {
+  private final StreamingSideInputFetcher<InputT, W> sideInputFetcher;
+
+  public StreamingSideInputProcessor(StreamingSideInputFetcher<InputT, W> 
sideInputFetcher) {
+    this.sideInputFetcher = sideInputFetcher;
+  }
+
+  /**
+   * Handle's startBundle. If there are unblocked elements, process them and 
then return the set of
+   * windows that were unblocked.
+   */
+  Iterable<WindowedValue<InputT>> tryUnblockElements() {
+    sideInputFetcher.prefetchBlockedMap();
+
+    // Find the set of ready windows.
+    Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+
+    Iterable<BagState<WindowedValue<InputT>>> elementsBags =
+        sideInputFetcher.prefetchElements(readyWindows);
+    List<WindowedValue<InputT>> releaseElements = Lists.newArrayList();
+    for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
+      Iterable<WindowedValue<InputT>> elements = elementsBag.read();
+      for (WindowedValue<InputT> elem : elements) {
+        releaseElements.add(elem);
+      }
+      elementsBag.clear();
+    }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   Flattening all unblocked elements from all ready windows into a single 
`ArrayList` poses a significant memory risk and could lead to 
`OutOfMemoryError` if a large number of elements were buffered. The previous 
implementation in `StreamingSideInputDoFnRunner` processed elements bag-by-bag, 
which is much more memory-efficient. Consider refactoring this to process 
elements lazily or bag-by-bag to avoid accumulating everything in memory at 
once.



##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
##########
@@ -3678,6 +3679,154 @@ public void processElement(
       pipeline.run();
     }
 
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSideInputs.class,
+      UsesSideInputsInTimer.class,
+      UsesTestStream.class,
+      UsesTimersInParDo.class,
+      UsesTriggeredSideInputs.class,
+      UsesOnWindowExpiration.class
+    })
+    public void testTimerSideInput() {
+      // SideInput tag id
+      final String sideInputTag1 = "tag1";
+
+      final PCollectionView<Integer> sideInput =
+          pipeline
+              .apply("CreateSideInput1", Create.of(2))
+              .apply("ViewSideInput1", View.asSingleton());
+
+      DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+          new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+            @TimerId("timer")
+            private final TimerSpec timerSpec = 
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+            @StateId("foo")
+            private final StateSpec<ValueState<Integer>> stateSpec = 
StateSpecs.value();
+
+            @ProcessElement
+            public void process(@Timestamp Instant ts, @TimerId("timer") Timer 
timer) {
+              timer.align(Duration.standardSeconds(10)).setRelative();
+            }
+
+            @OnTimer("timer")
+            public void onTimer(
+                OutputReceiver<KV<Integer, Integer>> o,
+                @DoFn.SideInput(sideInputTag1) Integer sideInput,
+                @Key Integer key) {
+              o.output(KV.of(key, sideInput));
+            }
+
+            @OnWindowExpiration
+            public void onWindowExpiration(
+                @DoFn.SideInput(sideInputTag1) Integer sideInput,
+                OutputReceiver<KV<Integer, Integer>> o,
+                @Key Integer key) {
+              o.output(KV.of(key, sideInput));
+            }
+          };
+
+      final int numTestElements = 10;
+      final Instant now = new Instant(0);
+      TestStream.Builder<KV<Integer, Integer>> builder =
+          TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+              .advanceWatermarkTo(new Instant(0));
+
+      for (int i = 0; i < numTestElements; i++) {
+        builder =
+            builder.addElements(
+                TimestampedValue.of(KV.of(i % 2, i), 
now.plus(Duration.millis(i * 1000))));
+        if ((i + 1) % 10 == 0) {
+          builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i + 
1) * 1000)));
+        }
+      }
+      List<KV<Integer, Integer>> expected =
+          IntStream.rangeClosed(0, 1)
+              .boxed()
+              .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i, 
2)).stream())
+              .collect(Collectors.toList());
+
+      PCollection<KV<Integer, Integer>> output =
+          pipeline
+              .apply(builder.advanceWatermarkToInfinity())
+              .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+      PAssert.that(output).containsInAnyOrder(expected);
+      pipeline.run();
+    }
+
+    @Test
+    @Category({
+      ValidatesRunner.class,
+      UsesStatefulParDo.class,
+      UsesSideInputs.class,
+      UsesSideInputsInTimer.class,
+      UsesTimersInParDo.class,
+      UsesTriggeredSideInputs.class
+    })
+    public void testSideInputNotReadyTimer() {
+      final String sideInputTag = "tag1";
+
+      // Create a side input that is delayed by 5 seconds using Thread.sleep
+      DoFn<KV<String, String>, String> delayFn =
+          new DoFn<KV<String, String>, String>() {
+            @ProcessElement
+            public void process(OutputReceiver<String> o) throws 
InterruptedException {
+              Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `Thread.sleep(15000)` makes the test suite significantly slower. For 
simulating a delayed side input in these tests, a shorter duration (e.g., 5 
seconds) is usually sufficient and helps maintain a faster CI pipeline.
   
   ```suggestion
                 Thread.sleep(java.time.Duration.ofSeconds(5).toMillis());
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -334,14 +360,28 @@ public void processElement(Object untypedElem) throws 
Exception {
 
     WindowedValue<InputT> elem = (WindowedValue<InputT>) untypedElem;
 
-    if (fnSignature != null && fnSignature.stateDeclarations().size() > 0) {
+    boolean hasState = fnSignature != null && 
!fnSignature.stateDeclarations().isEmpty();
+    outputsPerElementTracker.onProcessElement();
+
+    Collection<W> windowsProcessed;
+    if (sideInputProcessor != null) {
+      windowsProcessed = Lists.newArrayList();
+      Iterable<WindowedValue<InputT>> elementsToProcess =
+          sideInputProcessor.handleProcessElement(elem);
+      for (WindowedValue<InputT> toProcess : elementsToProcess) {
+        fnRunner.processElement(toProcess);
+        windowsProcessed.addAll((Collection<W>) toProcess.getWindows());

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The `windowsProcessed` list is allocated and populated even if `hasState` is 
false, in which case it is never used. Deferring the allocation and population 
of this list until it's known that state cleanup is required would be more 
efficient.
   
   ```suggestion
         windowsProcessed = hasState ? Lists.newArrayList() : 
ImmutableList.of();
         Iterable<WindowedValue<InputT>> elementsToProcess =
             sideInputProcessor.handleProcessElement(elem);
         for (WindowedValue<InputT> toProcess : elementsToProcess) {
           fnRunner.processElement(toProcess);
           if (hasState) {
             windowsProcessed.addAll((Collection<W>) toProcess.getWindows());
           }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to