gemini-code-assist[bot] commented on code in PR #38363:
URL: https://github.com/apache/beam/pull/38363#discussion_r3177871891
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -436,6 +501,9 @@ private void processSystemTimer(TimerData timer) throws
Exception {
public void finishBundle() throws Exception {
if (fnRunner != null) {
fnRunner.finishBundle();
+ if (sideInputFetcher != null) {
+ sideInputFetcher.persist();
+ }
Review Comment:

The `sideInputFetcher` field should be cleared at the end of the bundle.
Since `SimpleParDoFn` instances are typically reused across bundles in the
Dataflow worker, failing to null out this field can lead to stale state being
carried over to subsequent bundles or potential memory leaks. Clearing it here
ensures that each bundle starts with a clean state.
```suggestion
if (sideInputFetcher != null) {
sideInputFetcher.persist();
sideInputFetcher = null;
}
```
##########
sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java:
##########
@@ -3678,6 +3679,154 @@ public void processElement(
pipeline.run();
}
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTestStream.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class,
+ UsesOnWindowExpiration.class
+ })
+ public void testTimerSideInput() {
+ // SideInput tag id
+ final String sideInputTag1 = "tag1";
+
+ final PCollectionView<Integer> sideInput =
+ pipeline
+ .apply("CreateSideInput1", Create.of(2))
+ .apply("ViewSideInput1", View.asSingleton());
+
+ DoFn<KV<Integer, Integer>, KV<Integer, Integer>> doFn =
+ new DoFn<KV<Integer, Integer>, KV<Integer, Integer>>() {
+ @TimerId("timer")
+ private final TimerSpec timerSpec =
TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+ @StateId("foo")
+ private final StateSpec<ValueState<Integer>> stateSpec =
StateSpecs.value();
+
+ @ProcessElement
+ public void process(@Timestamp Instant ts, @TimerId("timer") Timer
timer) {
+ timer.align(Duration.standardSeconds(10)).setRelative();
+ }
+
+ @OnTimer("timer")
+ public void onTimer(
+ OutputReceiver<KV<Integer, Integer>> o,
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+
+ @OnWindowExpiration
+ public void onWindowExpiration(
+ @DoFn.SideInput(sideInputTag1) Integer sideInput,
+ OutputReceiver<KV<Integer, Integer>> o,
+ @Key Integer key) {
+ o.output(KV.of(key, sideInput));
+ }
+ };
+
+ final int numTestElements = 10;
+ final Instant now = new Instant(0);
+ TestStream.Builder<KV<Integer, Integer>> builder =
+ TestStream.create(KvCoder.of(VarIntCoder.of(), VarIntCoder.of()))
+ .advanceWatermarkTo(new Instant(0));
+
+ for (int i = 0; i < numTestElements; i++) {
+ builder =
+ builder.addElements(
+ TimestampedValue.of(KV.of(i % 2, i),
now.plus(Duration.millis(i * 1000))));
+ if ((i + 1) % 10 == 0) {
+ builder = builder.advanceWatermarkTo(now.plus(Duration.millis((i +
1) * 1000)));
+ }
+ }
+ List<KV<Integer, Integer>> expected =
+ IntStream.rangeClosed(0, 1)
+ .boxed()
+ .flatMap(i -> ImmutableList.of(KV.of(i, 2), KV.of(i,
2)).stream())
+ .collect(Collectors.toList());
+
+ PCollection<KV<Integer, Integer>> output =
+ pipeline
+ .apply(builder.advanceWatermarkToInfinity())
+ .apply(ParDo.of(doFn).withSideInput(sideInputTag1, sideInput));
+ PAssert.that(output).containsInAnyOrder(expected);
+ pipeline.run();
+ }
+
+ @Test
+ @Category({
+ ValidatesRunner.class,
+ UsesStatefulParDo.class,
+ UsesSideInputs.class,
+ UsesSideInputsInTimer.class,
+ UsesTimersInParDo.class,
+ UsesTriggeredSideInputs.class
+ })
+ public void testSideInputNotReadyTimer() {
+ final String sideInputTag = "tag1";
+
+ // Create a side input that is delayed by 5 seconds using Thread.sleep
+ DoFn<KV<String, String>, String> delayFn =
+ new DoFn<KV<String, String>, String>() {
+ @ProcessElement
+ public void process(OutputReceiver<String> o) throws
InterruptedException {
+ Thread.sleep(java.time.Duration.ofSeconds(15).toMillis());
Review Comment:

Using `Thread.sleep(15000)` in a test is highly discouraged as it
significantly increases the execution time of the test suite and can lead to
flakiness. Consider using a much shorter duration or a more deterministic
approach using `TestStream` to simulate the side input availability delay if
the test environment allows.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFn.java:
##########
@@ -317,8 +322,38 @@ public <TagT> void output(TupleTag<TagT> tag,
WindowedValue<TagT> output) {
outputManager,
doFnSchemaInformation,
sideInputMapping);
+ if (hasStreamingSideInput) {
+ sideInputFetcher =
+ new StreamingSideInputFetcher<InputT, W>(
+ fnInfo.getSideInputViews(),
+ fnInfo.getInputCoder(),
+ (WindowingStrategy<?, W>) fnInfo.getWindowingStrategy(),
+ (StreamingModeExecutionContext.StreamingModeStepContext)
userStepContext);
+ }
fnRunner.startBundle();
+ if (sideInputFetcher != null) {
+ Set<W> readyWindows = sideInputFetcher.getReadyWindows();
+ Iterable<BagState<WindowedValue<InputT>>> elementsBags =
+ sideInputFetcher.prefetchElements(readyWindows);
+ for (BagState<WindowedValue<InputT>> elementsBag : elementsBags) {
+ Iterable<WindowedValue<InputT>> elements = elementsBag.read();
+ for (WindowedValue<InputT> elem : elements) {
+ fnRunner.processElement(elem);
+ }
+ elementsBag.clear();
+
+ boolean hasState = fnSignature != null &&
!fnSignature.stateDeclarations().isEmpty();
+ if (hasState) {
+ // These elements are now processed. Register cleanup timers for all
the unblocked
+ // windows.
+ registerStateCleanup(
+ (WindowingStrategy<?, W>) getDoFnInfo().getWindowingStrategy(),
+ (Collection<W>) readyWindows);
+ }
Review Comment:

The call to `registerStateCleanup` is located inside the loop that iterates
over `elementsBags`. Since it is passed the entire `readyWindows` collection,
it will redundantly register cleanup timers for all unblocked windows in every
iteration of the loop. This should be moved outside the loop to improve
performance, especially when multiple windows are unblocked simultaneously.
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingSideInputFetcher.java:
##########
@@ -205,7 +201,18 @@ public boolean storeIfBlocked(WindowedValue<InputT> elem) {
}
}
}
- if (blocked != null) {
+ return blocked == null ? Collections.emptySet() : blocked;
+ }
+
+ /** Compute the set of side inputs that are not yet ready for the given main
input window. */
+ public boolean storeIfBlocked(WindowedValue<InputT> elem) {
+ @SuppressWarnings("unchecked")
+ W window = (W) Iterables.getOnlyElement(elem.getWindows());
+
+ Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
+ blockedMap().get(window);
Review Comment:

The call to `blockedMap().get(window)` on line 213 is a side-effect-free
operation whose result is ignored. It appears to be a leftover from refactoring
and should be removed to keep the code clean.
```suggestion
Set<Windmill.GlobalDataRequest> blocked = checkIfBlocked(window);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]