This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new fa9d19063ba SDF draining in dataflow runner v1 (#37831)
fa9d19063ba is described below

commit fa9d19063ba5eb14e56b45d61c16e6662168fdb6
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Tue Mar 17 19:03:57 2026 +0100

    SDF draining in dataflow runner v1 (#37831)
---
 ...TimeBoundedSplittableProcessElementInvoker.java |  5 +++
 .../core/SplittableParDoViaKeyedWorkItems.java     | 37 +++++++++++------
 .../runners/core/SplittableParDoProcessFnTest.java | 46 +++++++++++++++++++++-
 .../org/apache/beam/sdk/transforms/DoFnTester.java |  7 +++-
 .../org/apache/beam/sdk/values/WindowedValues.java |  2 +-
 .../apache/beam/fn/harness/FnApiDoFnRunner.java    | 20 +++++-----
 6 files changed, 93 insertions(+), 24 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 26c4979b257..d39801a7558 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -197,6 +197,11 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
                     processContext, 
OutputBuilderSuppliers.supplierForElement(element));
               }
 
+              @Override
+              public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+                return processContext.causedByDrain();
+              }
+
               @Override
               public RestrictionTracker<?, ?> restrictionTracker() {
                 return processContext.tracker;
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index a147ebeab8b..3519a74aada 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -462,7 +462,19 @@ public class SplittableParDoViaKeyedWorkItems {
         elementState.readLater();
         restrictionState.readLater();
         watermarkEstimatorState.readLater();
-        elementAndRestriction = KV.of(elementState.read(), 
restrictionState.read());
+        WindowedValue<InputT> read = elementState.read();
+        if (timer.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) {
+          read =
+              WindowedValues.of(
+                  read.getValue(),
+                  read.getTimestamp(),
+                  read.getWindows(),
+                  read.getPaneInfo(),
+                  read.getRecordId(),
+                  read.getRecordOffset(),
+                  CausedByDrain.CAUSED_BY_DRAIN);
+        }
+        elementAndRestriction = KV.of(read, restrictionState.read());
         watermarkEstimatorStateT = watermarkEstimatorState.read();
       }
 
@@ -574,7 +586,7 @@ public class SplittableParDoViaKeyedWorkItems {
           result =
               processElementInvoker.invokeProcessElement(
                   invoker,
-                  elementAndRestriction.getKey(),
+                  elementAndRestriction.getKey(), // windowed value
                   tracker,
                   watermarkEstimator,
                   sideInputMapping);
@@ -598,15 +610,18 @@ public class SplittableParDoViaKeyedWorkItems {
       Instant wakeupTime =
           
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
       holdState.add(futureOutputWatermark);
-      // Set a timer to continue processing this element.
-      // todo radoslws@ decide if draining should be set on timer
-      timerInternals.setTimer(
-          TimerInternals.TimerData.of(
-              stateNamespace,
-              wakeupTime,
-              wakeupTime,
-              TimeDomain.PROCESSING_TIME,
-              timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
+      // Set a timer to continue processing this element, but only when no 
drain
+      if (timer == null || timer.causedByDrain() == CausedByDrain.NORMAL) {
+        timerInternals.setTimer(
+            TimerInternals.TimerData.of(
+                stateNamespace,
+                wakeupTime,
+                wakeupTime,
+                TimeDomain.PROCESSING_TIME,
+                CausedByDrain.NORMAL));
+      } else {
+        holdState.clear();
+      }
     }
 
     private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 3aebe2840a9..614721761f9 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.state.TimeDomain;
 import org.apache.beam.sdk.testing.ResetDateTimeProvider;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -61,6 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TimestampedValue;
@@ -223,6 +225,22 @@ public class SplittableParDoProcessFnTest {
               "key".getBytes(StandardCharsets.UTF_8), 
Collections.singletonList(windowedValue)));
     }
 
+    boolean advanceDrain() throws Exception {
+
+      List<TimerInternals.TimerData> timers = new ArrayList<>();
+      timers.add(
+          TimerInternals.TimerData.of(
+              "",
+              StateNamespaces.window(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.INSTANCE),
+              new Instant(Long.MAX_VALUE),
+              new Instant(Long.MAX_VALUE),
+              TimeDomain.EVENT_TIME,
+              CausedByDrain.CAUSED_BY_DRAIN));
+      tester.processElement(
+          
KeyedWorkItems.timersWorkItem("key".getBytes(StandardCharsets.UTF_8), timers));
+      return true;
+    }
+
     /**
      * Advances processing time by a given duration and, if any timers fired, 
performs a non-seed
      * {@link DoFn.ProcessElement} call, feeding it the timers.
@@ -334,10 +352,14 @@ public class SplittableParDoProcessFnTest {
     public void process(
         ProcessContext c,
         RestrictionTracker<OffsetRange, Long> tracker,
-        ManualWatermarkEstimator<Instant> watermarkEstimator) {
+        ManualWatermarkEstimator<Instant> watermarkEstimator,
+        CausedByDrain causedByDrain) {
       for (long i = tracker.currentRestriction().getFrom(); 
tracker.tryClaim(i); ++i) {
         
watermarkEstimator.setWatermark(c.element().plus(Duration.standardSeconds(i)));
         c.output(String.valueOf(i));
+        if (causedByDrain == CausedByDrain.CAUSED_BY_DRAIN) {
+          c.output("drains");
+        }
       }
     }
 
@@ -363,6 +385,28 @@ public class SplittableParDoProcessFnTest {
     }
   }
 
+  @Test
+  public void testDrains() throws Exception {
+    DoFn<Instant, String> fn = new WatermarkUpdateFn();
+    Instant base = Instant.now();
+
+    ProcessFnTester<Instant, String, OffsetRange, Long, Instant> tester =
+        new ProcessFnTester<>(
+            base,
+            fn,
+            InstantCoder.of(),
+            SerializableCoder.of(OffsetRange.class),
+            InstantCoder.of(),
+            3,
+            MAX_BUNDLE_DURATION);
+
+    tester.startElement(base, new OffsetRange(0, 8));
+    assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
+    assertEquals(base.plus(Duration.standardSeconds(2)), 
tester.getWatermarkHold());
+    assertTrue(tester.advanceDrain());
+    assertThat(tester.takeOutputElements(), hasItems("3", "4", "drains", 
"drains"));
+  }
+
   @Test
   public void testUpdatesWatermark() throws Exception {
     DoFn<Instant, String> fn = new WatermarkUpdateFn();
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 52aea43cc66..a4f3cba2105 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -237,6 +237,11 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
               return window;
             }
 
+            @Override
+            public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+              return processContext.causedByDrain();
+            }
+
             @Override
             public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
               return processContext.pane();
@@ -598,7 +603,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
 
     @Override
     public CausedByDrain causedByDrain() {
-      return CausedByDrain.NORMAL;
+      return element.getCausedByDrain();
     }
 
     @Override
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
index 6462de5bac9..ebe26be91c9 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
@@ -313,7 +313,7 @@ public class WindowedValues {
 
     boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
     if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
-      return valueInGlobalWindow(value, paneInfo);
+      return new ValueInGlobalWindow<>(value, paneInfo, null, null, 
causedByDrain);
     } else if (isGlobal) {
       return new TimestampedValueInGlobalWindow<>(
           value, timestamp, paneInfo, null, null, causedByDrain);
diff --git 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 1dfa336e35f..de69f49ecc3 100644
--- 
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++ 
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1804,11 +1804,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
       outputTo(consumer, WindowedValues.of(output, timestamp, windows, 
paneInfo));
     }
 
-    @Override
-    public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
-      return currentElement.causedByDrain();
-    }
-
     @Override
     public State state(String stateId, boolean alwaysFetched) {
       StateDeclaration stateDeclaration = 
doFnSignature.stateDeclarations().get(stateId);
@@ -1860,11 +1855,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
           currentElement.getTimestamp(),
           currentElement.getPaneInfo());
     }
-
-    @Override
-    public CausedByDrain causedByDrain() {
-      return currentElement.causedByDrain();
-    }
   }
 
   /** Provides arguments for a {@link DoFnInvoker} for a non-window observing 
method. */
@@ -2254,6 +2244,16 @@ public class FnApiDoFnRunner<InputT, RestrictionT, 
PositionT, WatermarkEstimator
     public WatermarkEstimator<?> watermarkEstimator() {
       return currentWatermarkEstimator;
     }
+
+    @Override
+    public CausedByDrain causedByDrain() {
+      return currentElement.causedByDrain();
+    }
+
+    @Override
+    public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+      return currentElement.causedByDrain();
+    }
   }
 
   /**

Reply via email to