This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new fa9d19063ba SDF draining in dataflow runner v1 (#37831)
fa9d19063ba is described below
commit fa9d19063ba5eb14e56b45d61c16e6662168fdb6
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Tue Mar 17 19:03:57 2026 +0100
SDF draining in dataflow runner v1 (#37831)
---
...TimeBoundedSplittableProcessElementInvoker.java | 5 +++
.../core/SplittableParDoViaKeyedWorkItems.java | 37 +++++++++++------
.../runners/core/SplittableParDoProcessFnTest.java | 46 +++++++++++++++++++++-
.../org/apache/beam/sdk/transforms/DoFnTester.java | 7 +++-
.../org/apache/beam/sdk/values/WindowedValues.java | 2 +-
.../apache/beam/fn/harness/FnApiDoFnRunner.java | 20 +++++-----
6 files changed, 93 insertions(+), 24 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 26c4979b257..d39801a7558 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -197,6 +197,11 @@ public class
OutputAndTimeBoundedSplittableProcessElementInvoker<
processContext,
OutputBuilderSuppliers.supplierForElement(element));
}
+ @Override
+ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+ return processContext.causedByDrain();
+ }
+
@Override
public RestrictionTracker<?, ?> restrictionTracker() {
return processContext.tracker;
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
index a147ebeab8b..3519a74aada 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDoViaKeyedWorkItems.java
@@ -462,7 +462,19 @@ public class SplittableParDoViaKeyedWorkItems {
elementState.readLater();
restrictionState.readLater();
watermarkEstimatorState.readLater();
- elementAndRestriction = KV.of(elementState.read(),
restrictionState.read());
+ WindowedValue<InputT> read = elementState.read();
+ if (timer.causedByDrain() == CausedByDrain.CAUSED_BY_DRAIN) {
+ read =
+ WindowedValues.of(
+ read.getValue(),
+ read.getTimestamp(),
+ read.getWindows(),
+ read.getPaneInfo(),
+ read.getRecordId(),
+ read.getRecordOffset(),
+ CausedByDrain.CAUSED_BY_DRAIN);
+ }
+ elementAndRestriction = KV.of(read, restrictionState.read());
watermarkEstimatorStateT = watermarkEstimatorState.read();
}
@@ -574,7 +586,7 @@ public class SplittableParDoViaKeyedWorkItems {
result =
processElementInvoker.invokeProcessElement(
invoker,
- elementAndRestriction.getKey(),
+ elementAndRestriction.getKey(), // windowed value
tracker,
watermarkEstimator,
sideInputMapping);
@@ -598,15 +610,18 @@ public class SplittableParDoViaKeyedWorkItems {
Instant wakeupTime =
timerInternals.currentProcessingTime().plus(result.getContinuation().resumeDelay());
holdState.add(futureOutputWatermark);
- // Set a timer to continue processing this element.
- // todo radoslws@ decide if draining should be set on timer
- timerInternals.setTimer(
- TimerInternals.TimerData.of(
- stateNamespace,
- wakeupTime,
- wakeupTime,
- TimeDomain.PROCESSING_TIME,
- timer == null ? CausedByDrain.NORMAL : timer.causedByDrain()));
+ // Set a timer to continue processing this element, but only when no
drain
+ if (timer == null || timer.causedByDrain() == CausedByDrain.NORMAL) {
+ timerInternals.setTimer(
+ TimerInternals.TimerData.of(
+ stateNamespace,
+ wakeupTime,
+ wakeupTime,
+ TimeDomain.PROCESSING_TIME,
+ CausedByDrain.NORMAL));
+ } else {
+ holdState.clear();
+ }
}
private DoFnInvoker.ArgumentProvider<InputT, OutputT> wrapOptionsAsSetup(
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
index 3aebe2840a9..614721761f9 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoProcessFnTest.java
@@ -45,6 +45,7 @@ import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.VoidCoder;
import org.apache.beam.sdk.io.range.OffsetRange;
+import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.testing.ResetDateTimeProvider;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.DoFn;
@@ -61,6 +62,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValueMultiReceiver;
+import org.apache.beam.sdk.values.CausedByDrain;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TimestampedValue;
@@ -223,6 +225,22 @@ public class SplittableParDoProcessFnTest {
"key".getBytes(StandardCharsets.UTF_8),
Collections.singletonList(windowedValue)));
}
+ boolean advanceDrain() throws Exception {
+
+ List<TimerInternals.TimerData> timers = new ArrayList<>();
+ timers.add(
+ TimerInternals.TimerData.of(
+ "",
+ StateNamespaces.window(GlobalWindow.Coder.INSTANCE,
GlobalWindow.INSTANCE),
+ new Instant(Long.MAX_VALUE),
+ new Instant(Long.MAX_VALUE),
+ TimeDomain.EVENT_TIME,
+ CausedByDrain.CAUSED_BY_DRAIN));
+ tester.processElement(
+
KeyedWorkItems.timersWorkItem("key".getBytes(StandardCharsets.UTF_8), timers));
+ return true;
+ }
+
/**
* Advances processing time by a given duration and, if any timers fired,
performs a non-seed
* {@link DoFn.ProcessElement} call, feeding it the timers.
@@ -334,10 +352,14 @@ public class SplittableParDoProcessFnTest {
public void process(
ProcessContext c,
RestrictionTracker<OffsetRange, Long> tracker,
- ManualWatermarkEstimator<Instant> watermarkEstimator) {
+ ManualWatermarkEstimator<Instant> watermarkEstimator,
+ CausedByDrain causedByDrain) {
for (long i = tracker.currentRestriction().getFrom();
tracker.tryClaim(i); ++i) {
watermarkEstimator.setWatermark(c.element().plus(Duration.standardSeconds(i)));
c.output(String.valueOf(i));
+ if (causedByDrain == CausedByDrain.CAUSED_BY_DRAIN) {
+ c.output("drains");
+ }
}
}
@@ -363,6 +385,28 @@ public class SplittableParDoProcessFnTest {
}
}
+ @Test
+ public void testDrains() throws Exception {
+ DoFn<Instant, String> fn = new WatermarkUpdateFn();
+ Instant base = Instant.now();
+
+ ProcessFnTester<Instant, String, OffsetRange, Long, Instant> tester =
+ new ProcessFnTester<>(
+ base,
+ fn,
+ InstantCoder.of(),
+ SerializableCoder.of(OffsetRange.class),
+ InstantCoder.of(),
+ 3,
+ MAX_BUNDLE_DURATION);
+
+ tester.startElement(base, new OffsetRange(0, 8));
+ assertThat(tester.takeOutputElements(), hasItems("0", "1", "2"));
+ assertEquals(base.plus(Duration.standardSeconds(2)),
tester.getWatermarkHold());
+ assertTrue(tester.advanceDrain());
+ assertThat(tester.takeOutputElements(), hasItems("3", "4", "drains",
"drains"));
+ }
+
@Test
public void testUpdatesWatermark() throws Exception {
DoFn<Instant, String> fn = new WatermarkUpdateFn();
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 52aea43cc66..a4f3cba2105 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -237,6 +237,11 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
return window;
}
+ @Override
+ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+ return processContext.causedByDrain();
+ }
+
@Override
public PaneInfo paneInfo(DoFn<InputT, OutputT> doFn) {
return processContext.pane();
@@ -598,7 +603,7 @@ public class DoFnTester<InputT, OutputT> implements
AutoCloseable {
@Override
public CausedByDrain causedByDrain() {
- return CausedByDrain.NORMAL;
+ return element.getCausedByDrain();
}
@Override
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
index 6462de5bac9..ebe26be91c9 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowedValues.java
@@ -313,7 +313,7 @@ public class WindowedValues {
boolean isGlobal = GlobalWindow.INSTANCE.equals(window);
if (isGlobal && BoundedWindow.TIMESTAMP_MIN_VALUE.equals(timestamp)) {
- return valueInGlobalWindow(value, paneInfo);
+ return new ValueInGlobalWindow<>(value, paneInfo, null, null,
causedByDrain);
} else if (isGlobal) {
return new TimestampedValueInGlobalWindow<>(
value, timestamp, paneInfo, null, null, causedByDrain);
diff --git
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
index 1dfa336e35f..de69f49ecc3 100644
---
a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
+++
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/FnApiDoFnRunner.java
@@ -1804,11 +1804,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
outputTo(consumer, WindowedValues.of(output, timestamp, windows,
paneInfo));
}
- @Override
- public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
- return currentElement.causedByDrain();
- }
-
@Override
public State state(String stateId, boolean alwaysFetched) {
StateDeclaration stateDeclaration =
doFnSignature.stateDeclarations().get(stateId);
@@ -1860,11 +1855,6 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
currentElement.getTimestamp(),
currentElement.getPaneInfo());
}
-
- @Override
- public CausedByDrain causedByDrain() {
- return currentElement.causedByDrain();
- }
}
/** Provides arguments for a {@link DoFnInvoker} for a non-window observing
method. */
@@ -2254,6 +2244,16 @@ public class FnApiDoFnRunner<InputT, RestrictionT,
PositionT, WatermarkEstimator
public WatermarkEstimator<?> watermarkEstimator() {
return currentWatermarkEstimator;
}
+
+ @Override
+ public CausedByDrain causedByDrain() {
+ return currentElement.causedByDrain();
+ }
+
+ @Override
+ public CausedByDrain causedByDrain(DoFn<InputT, OutputT> doFn) {
+ return currentElement.causedByDrain();
+ }
}
/**