This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new f22400a21a9 [Dataflow Streaming] Add experiment to use trigger state
to know if a window is new and optimize reads (#37574)
f22400a21a9 is described below
commit f22400a21a9a7367dc53f5ee6d7a6c33ee8f6ea2
Author: Arun Pandian <[email protected]>
AuthorDate: Wed May 20 05:48:56 2026 -0700
[Dataflow Streaming] Add experiment to use trigger state to know if a
window is new and optimize reads (#37574)
If unstable_not_update_compatible_new_window_optimization experiment is set
always write trigger state so it's absence can be used to know that the window
has never existed to avoid reads. This is only performed for non-merging
windows.
Additionally optimize for all cases that trigger state is not rewritten if
it is unchanged. This logic already existed but was broken due to missing
equals method implementation.
---
.../apache/beam/runners/core/ReduceFnRunner.java | 49 +++++-
.../apache/beam/runners/core/WatermarkHold.java | 17 +++
.../core/triggers/FinishedTriggersBitSet.java | 14 ++
.../core/triggers/TriggerStateMachineRunner.java | 43 ++++--
.../beam/runners/core/ReduceFnRunnerTest.java | 58 +++++++
.../apache/beam/runners/core/ReduceFnTester.java | 13 ++
.../triggers/TriggerStateMachineRunnerTest.java | 117 +++++++++++++++
.../windmill/state/WindmillWatermarkHold.java | 94 ++++++++----
.../windmill/state/WindmillStateInternalsTest.java | 167 ++++++++++++++++++++-
.../apache/beam/sdk/state/WatermarkHoldState.java | 10 ++
10 files changed, 535 insertions(+), 47 deletions(-)
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 1ae0c52f853..78505f3c65f 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -37,6 +37,7 @@ import
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.state.CombiningState;
import org.apache.beam.sdk.state.TimeDomain;
@@ -94,6 +95,11 @@ import org.joda.time.Instant;
}) // TODO(https://github.com/apache/beam/issues/20497)
public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
+ // Experiments guarding optimizations in development. No backward
compatibility guarantees.
+ public static final String
UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION =
+ "unstable_not_update_compatible_new_window_optimization";
+ public static final String
UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION =
+ "unstable_disable_watermark_known_empty_optimization";
/**
* The {@link ReduceFnRunner} depends on most aspects of the {@link
WindowingStrategy}.
*
@@ -218,6 +224,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
*/
private final NonEmptyPanes<K, W> nonEmptyPanes;
+ private final boolean useNewWindowOptimization;
+ private final boolean disableWatermarkKnownEmptyOptimization;
+
public ReduceFnRunner(
K key,
WindowingStrategy<?, W> windowingStrategy,
@@ -244,6 +253,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy,
this.reduceFn);
+ this.useNewWindowOptimization =
+ windowingStrategy.getWindowFn().isNonMerging()
+ && ExperimentalOptions.hasExperiment(
+ options,
UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION);
+
+ this.disableWatermarkKnownEmptyOptimization =
+ ExperimentalOptions.hasExperiment(
+ options, UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION);
+
// Note this may incur I/O to load persisted window set data.
this.activeWindows = createActiveWindowSet();
@@ -263,7 +281,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
new TriggerStateMachineRunner<>(
triggerStateMachine,
new TriggerStateMachineContextFactory<>(
- windowingStrategy.getWindowFn(), stateInternals,
activeWindows));
+ windowingStrategy.getWindowFn(), stateInternals,
activeWindows),
+ this.useNewWindowOptimization);
}
private ActiveWindowSet<W> createActiveWindowSet() {
@@ -282,6 +301,16 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
return activeWindows.getActiveAndNewWindows().isEmpty();
}
+ @VisibleForTesting
+ TriggerStateMachineRunner<W> getTriggerRunner() {
+ return triggerRunner;
+ }
+
+ @VisibleForTesting
+ ReduceFnContextFactory<K, InputT, OutputT, W> getContextFactory() {
+ return contextFactory;
+ }
+
private Set<W> windowsThatAreOpen(Collection<W> windows) {
Set<W> result = new HashSet<>();
for (W window : windows) {
@@ -626,6 +655,20 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
StateStyle.RENAMED,
value.causedByDrain());
+ // TODO: Make sure the NewWindowOptimization does not create unbounded
trigger state
+ // in GlobalWindow
+ if (useNewWindowOptimization &&
triggerRunner.isNew(directContext.state())) {
+ // Blindly clear state to ensure Windmill doesn't do unnecessary reads.
+ // TODO: Instead of the clears here, we could mark these states as
empty locally
+ // in the state cache and/or explicitly tell that the entries are
non-existent via api
+ reduceFn.clearState(renamedContext);
+ paneInfoTracker.clear(directContext.state());
+ if (!disableWatermarkKnownEmptyOptimization) {
+ watermarkHold.setKnownEmpty(renamedContext);
+ }
+ nonEmptyPanes.clearPane(renamedContext.state());
+ }
+
nonEmptyPanes.recordContent(renamedContext.state());
scheduleGarbageCollectionTimer(directContext);
@@ -761,7 +804,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
// Perform prefetching of state to determine if the trigger should fire.
if (windowActivation.isGarbageCollection) {
- triggerRunner.prefetchIsClosed(directContext.state());
+ triggerRunner.prefetchFinishedSet(directContext.state());
} else {
triggerRunner.prefetchShouldFire(directContext.window(),
directContext.state());
}
@@ -941,7 +984,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends
BoundedWindow> {
ReduceFn<K, InputT, OutputT, W>.Context directContext,
ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
triggerRunner.prefetchShouldFire(directContext.window(),
directContext.state());
- triggerRunner.prefetchIsClosed(directContext.state());
+ triggerRunner.prefetchFinishedSet(directContext.state());
prefetchOnTrigger(directContext, renamedContext);
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 15ae8dfe5f1..b9185ccfba3 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -466,6 +466,23 @@ class WatermarkHold<W extends BoundedWindow> implements
Serializable {
context.state().access(EXTRA_HOLD_TAG).clear();
}
+ /**
+ * <b><i>For internal use only; no backwards-compatibility
guarantees.</i></b>
+ *
+ * <p>Permit marking the watermark holds as empty locally, without
necessarily clearing them in
+ * the backend.
+ */
+ public void setKnownEmpty(ReduceFn<?, ?, ?, W>.Context context) {
+ WindowTracing.debug(
+ "WatermarkHold.setKnownEmpty: For key:{}; window:{};
inputWatermark:{}; outputWatermark:{}",
+ context.key(),
+ context.window(),
+ timerInternals.currentInputWatermarkTime(),
+ timerInternals.currentOutputWatermarkTime());
+ context.state().access(elementHoldTag).setKnownEmpty();
+ context.state().access(EXTRA_HOLD_TAG).setKnownEmpty();
+ }
+
/** Return the current data hold, or null if none. Does not clear. For
debugging only. */
public @Nullable Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context
context) {
return context.state().access(elementHoldTag).read();
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
index 7eebb4474c6..967e1ef43f0 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
@@ -18,6 +18,7 @@
package org.apache.beam.runners.core.triggers;
import java.util.BitSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
/** A {@link FinishedTriggers} implementation based on an underlying {@link
BitSet}. */
public class FinishedTriggersBitSet implements FinishedTriggers {
@@ -60,4 +61,17 @@ public class FinishedTriggersBitSet implements
FinishedTriggers {
public FinishedTriggersBitSet copy() {
return new FinishedTriggersBitSet((BitSet) bitSet.clone());
}
+
+ @Override
+ public boolean equals(@Nullable Object obj) {
+ if (!(obj instanceof FinishedTriggersBitSet)) {
+ return false;
+ }
+ return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet);
+ }
+
+ @Override
+ public int hashCode() {
+ return bitSet.hashCode();
+ }
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index e3791821b72..90daa5ac75b 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -63,13 +63,16 @@ public class TriggerStateMachineRunner<W extends
BoundedWindow> {
private final ExecutableTriggerStateMachine rootTrigger;
private final TriggerStateMachineContextFactory<W> contextFactory;
+ private final boolean useNewWindowOptimization;
public TriggerStateMachineRunner(
ExecutableTriggerStateMachine rootTrigger,
- TriggerStateMachineContextFactory<W> contextFactory) {
+ TriggerStateMachineContextFactory<W> contextFactory,
+ boolean useNewWindowOptimization) {
checkState(rootTrigger.getTriggerIndex() == 0);
this.rootTrigger = rootTrigger;
this.contextFactory = contextFactory;
+ this.useNewWindowOptimization = useNewWindowOptimization;
}
private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
@@ -81,9 +84,11 @@ public class TriggerStateMachineRunner<W extends
BoundedWindow> {
}
@Nullable BitSet bitSet = state.read();
- return bitSet == null
- ?
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
- : FinishedTriggersBitSet.fromBitSet(bitSet);
+ if (bitSet == null) {
+ return
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+ }
+
+ return FinishedTriggersBitSet.fromBitSet(bitSet);
}
private void clearFinishedBits(ValueState<BitSet> state) {
@@ -99,19 +104,29 @@ public class TriggerStateMachineRunner<W extends
BoundedWindow> {
return
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
}
- public void prefetchIsClosed(StateAccessor<?> state) {
+ /** Return true if the window is new (no trigger state has ever been
persisted). */
+ public boolean isNew(StateAccessor<?> state) {
+ return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() ==
null;
+ }
+
+ @VisibleForTesting
+ public BitSet getFinishedBits(StateAccessor<?> state) {
+ return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet();
+ }
+
+ public void prefetchFinishedSet(StateAccessor<?> state) {
if (isFinishedSetNeeded()) {
state.access(FINISHED_BITS_TAG).readLater();
}
}
public void prefetchForValue(W window, StateAccessor<?> state) {
- prefetchIsClosed(state);
+ prefetchFinishedSet(state);
rootTrigger.invokePrefetchOnElement(contextFactory.createPrefetchContext(window,
rootTrigger));
}
public void prefetchShouldFire(W window, StateAccessor<?> state) {
- prefetchIsClosed(state);
+ prefetchFinishedSet(state);
rootTrigger.invokePrefetchShouldFire(contextFactory.createPrefetchContext(window,
rootTrigger));
}
@@ -180,13 +195,23 @@ public class TriggerStateMachineRunner<W extends
BoundedWindow> {
persistFinishedSet(state, finishedSet);
}
- private void persistFinishedSet(
- StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+ @VisibleForTesting
+ void persistFinishedSet(StateAccessor<?> state, FinishedTriggersBitSet
modifiedFinishedSet) {
if (!isFinishedSetNeeded()) {
return;
}
ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+
+ if (useNewWindowOptimization) {
+ if (finishedSetState.read() == null
+ || !readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+ // Write a value even if the bitset was empty
+ finishedSetState.write(modifiedFinishedSet.getBitSet());
+ }
+ return;
+ }
+
if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
if (modifiedFinishedSet.getBitSet().isEmpty()) {
finishedSetState.clear();
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 85f6573be23..72b379a7c71 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -40,15 +40,19 @@ import static org.mockito.Mockito.withSettings;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
import org.apache.beam.runners.core.triggers.TriggerStateMachine;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.TimeDomain;
@@ -2343,4 +2347,58 @@ public class ReduceFnRunnerTest {
void setValue(int value);
}
+
+ @Test
+ public void testNewWindowOptimization() throws Exception {
+ WindowingStrategy<?, IntervalWindow> strategy =
+ WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+ .withTrigger(AfterPane.elementCountAtLeast(2))
+ .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+ PipelineOptions options = PipelineOptionsFactory.create();
+ options
+ .as(ExperimentalOptions.class)
+ .setExperiments(
+
Collections.singletonList("unstable_not_update_compatible_new_window_optimization"));
+ ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+ ReduceFnTester.nonCombining(strategy, options);
+
+ IntervalWindow window = new IntervalWindow(new Instant(0), new
Instant(10));
+
+ assertTrue(
+ "Window should be new",
+ tester
+ .createRunner()
+ .getTriggerRunner()
+ .isNew(
+ tester.createRunner().getContextFactory().base(window,
StateStyle.DIRECT).state()));
+
+ // 1. First element for a new window.
+ tester.injectElements(TimestampedValue.of(1, new Instant(1)));
+
+ BitSet bitSet =
+ tester
+ .createRunner()
+ .getTriggerRunner()
+ .getFinishedBits(
+ tester.createRunner().getContextFactory().base(window,
StateStyle.DIRECT).state());
+ assertTrue("Bitset should be empty", bitSet.isEmpty());
+ assertFalse("Trigger should not be finished", bitSet.get(0));
+
+ assertFalse(
+ "Window should no longer be new",
+ tester
+ .createRunner()
+ .getTriggerRunner()
+ .isNew(
+ tester.createRunner().getContextFactory().base(window,
StateStyle.DIRECT).state()));
+
+ // 2. Second element for the same window.
+ tester.injectElements(TimestampedValue.of(2, new Instant(2)));
+
+ // Extract output.
+ List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+ // 2 elements fired at end of window.
+ assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1,
2), 9, 0, 10)));
+ }
}
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 2326a1c77d3..fe2c0a25346 100644
---
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -129,6 +129,19 @@ public class ReduceFnTester<InputT, OutputT, W extends
BoundedWindow> {
NullSideInputReader.empty());
}
+ public static <W extends BoundedWindow>
+ ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
+ WindowingStrategy<?, W> windowingStrategy, PipelineOptions options)
throws Exception {
+ return new ReduceFnTester<>(
+ windowingStrategy,
+ TriggerStateMachines.stateMachineForTrigger(
+ TriggerTranslation.toProto(windowingStrategy.getTrigger())),
+ SystemReduceFn.buffering(VarIntCoder.of()),
+ IterableCoder.of(VarIntCoder.of()),
+ options,
+ NullSideInputReader.empty());
+ }
+
/**
* Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy}
and {@link
* TriggerStateMachine}, for mocking the interactions between {@link
ReduceFnRunner} and the
diff --git
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java
new file mode 100644
index 00000000000..81c80753275
--- /dev/null
+++
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.BitSet;
+import org.apache.beam.runners.core.StateAccessor;
+import org.apache.beam.sdk.state.ValueState;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(JUnit4.class)
+public class TriggerStateMachineRunnerTest {
+
+ @Mock private StateAccessor<?> mockState;
+ @Mock private ValueState<BitSet> mockFinishedSetState;
+ @Mock private TriggerStateMachineContextFactory<?> mockContextFactory;
+ @Mock private TriggerStateMachine mockTriggerStateMachine;
+
+ private ExecutableTriggerStateMachine rootTrigger;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ when(mockState.access(TriggerStateMachineRunner.FINISHED_BITS_TAG))
+ .thenReturn((ValueState) mockFinishedSetState);
+ rootTrigger =
ExecutableTriggerStateMachine.create(mockTriggerStateMachine);
+ }
+
+ @Test
+ public void testPersistFinishedSet_emptyAndOptimizationEnabled() throws
Exception {
+ when(mockFinishedSetState.read()).thenReturn(null);
+
+ TriggerStateMachineRunner<?> runner =
+ new TriggerStateMachineRunner<>(
+ rootTrigger,
+ (TriggerStateMachineContextFactory) mockContextFactory,
+ true /* useNewWindowOptimization */);
+
+ FinishedTriggersBitSet modifiedFinishedSet =
FinishedTriggersBitSet.emptyWithCapacity(1);
+
+ runner.persistFinishedSet(mockState, modifiedFinishedSet);
+
+ // Should write empty bitset because optimization is enabled
+ verify(mockFinishedSetState).write(modifiedFinishedSet.getBitSet());
+ }
+
+ @Test
+ public void testPersistFinishedSet_emptyAndOptimizationDisabled() throws
Exception {
+ when(mockFinishedSetState.read()).thenReturn(null);
+
+ TriggerStateMachineRunner<?> runner =
+ new TriggerStateMachineRunner<>(
+ rootTrigger,
+ (TriggerStateMachineContextFactory) mockContextFactory,
+ false /* useNewWindowOptimization */);
+
+ FinishedTriggersBitSet modifiedFinishedSet =
FinishedTriggersBitSet.emptyWithCapacity(1);
+
+ runner.persistFinishedSet(mockState, modifiedFinishedSet);
+
+ // Should NOT write empty bitset because optimization is disabled and it
was already empty (read
+ // returned null)
+ verify(mockFinishedSetState,
never()).write(modifiedFinishedSet.getBitSet());
+ }
+
+ private void runTestPersistFinishedSet_nonEmpty(boolean
useNewWindowOptimization)
+ throws Exception {
+ when(mockFinishedSetState.read()).thenReturn(null);
+
+ TriggerStateMachineRunner<?> runner =
+ new TriggerStateMachineRunner<>(
+ rootTrigger,
+ (TriggerStateMachineContextFactory) mockContextFactory,
+ useNewWindowOptimization);
+
+ FinishedTriggersBitSet modifiedFinishedSet =
FinishedTriggersBitSet.emptyWithCapacity(1);
+ modifiedFinishedSet.setFinished(rootTrigger, true);
+
+ runner.persistFinishedSet(mockState, modifiedFinishedSet);
+
+ // Should write non-empty bitset
+ verify(mockFinishedSetState).write(modifiedFinishedSet.getBitSet());
+ }
+
+ @Test
+ public void testPersistFinishedSet_nonEmpty() throws Exception {
+ runTestPersistFinishedSet_nonEmpty(false);
+ }
+
+ @Test
+ public void testPersistFinishedSet_nonEmptyAndOptimizationEnabled() throws
Exception {
+ runTestPersistFinishedSet_nonEmpty(true);
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
index b426b96cb5b..843a7347bdc 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
@@ -17,6 +17,8 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.state;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
@@ -37,6 +39,7 @@ import org.joda.time.Instant;
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class WindmillWatermarkHold extends WindmillState implements
WatermarkHoldState {
+
// The encoded size of an Instant.
private static final int ENCODED_SIZE = 8;
@@ -46,6 +49,7 @@ public class WindmillWatermarkHold extends WindmillState
implements WatermarkHol
private final String stateFamily;
private boolean cleared = false;
+ private boolean knownEmpty = false;
/**
* If non-{@literal null}, the known current hold value, or absent if we
know there are no output
* watermark holds. If {@literal null}, the current hold value could depend
on holds in Windmill
@@ -81,6 +85,17 @@ public class WindmillWatermarkHold extends WindmillState
implements WatermarkHol
cachedValue = Optional.absent();
}
+ @Override
+ public void setKnownEmpty() {
+ checkState(localAdditions == null, "setKnownEmpty called with local
additions");
+ checkState(!cleared, "setKnownEmpty called after clearing");
+ checkState(
+ cachedValue == null || !cachedValue.isPresent(),
+ "setKnownEmpty called with a cached value");
+ cachedValue = Optional.absent();
+ knownEmpty = true;
+ }
+
@Override
@SuppressWarnings("FutureReturnValueIgnored")
public WindmillWatermarkHold readLater() {
@@ -137,48 +152,67 @@ public class WindmillWatermarkHold extends WindmillState
implements WatermarkHol
Future<Windmill.WorkItemCommitRequest> result;
- if (!cleared && localAdditions == null) {
- // No changes, so no need to update Windmill and no need to cache any
value.
- return
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
- }
-
- if (cleared && localAdditions == null) {
- // Just clearing the persisted state; blind delete
- Windmill.WorkItemCommitRequest.Builder commitBuilder =
- Windmill.WorkItemCommitRequest.newBuilder();
- commitBuilder
- .addWatermarkHoldsBuilder()
- .setTag(stateKey.byteString())
- .setStateFamily(stateFamily)
- .setReset(true);
+ if (knownEmpty) {
+ if (localAdditions != null) {
+ // 1. We know it's empty, so we can just update with localAdditions
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ commitBuilder
+ .addWatermarkHoldsBuilder()
+ .setTag(stateKey.byteString())
+ .setStateFamily(stateFamily)
+
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions));
- result = Futures.immediateFuture(commitBuilder.buildPartial());
- } else if (cleared && localAdditions != null) {
- // Since we cleared before adding, we can do a blind overwrite of
persisted state
- Windmill.WorkItemCommitRequest.Builder commitBuilder =
- Windmill.WorkItemCommitRequest.newBuilder();
- commitBuilder
- .addWatermarkHoldsBuilder()
- .setTag(stateKey.byteString())
- .setStateFamily(stateFamily)
- .setReset(true)
-
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions));
+ cachedValue = Optional.of(localAdditions);
+ result = Futures.immediateFuture(commitBuilder.buildPartial());
+ } else {
+ // 2. State is known to be empty and there are no local additions.
+ // Whether 'cleared' was called or not, the desired state is empty.
+ // So no need to update Windmill.
+ result =
+
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
+ }
+ } else if (cleared) {
+ if (localAdditions == null) {
+ // 3. Just clearing the persisted state; blind delete
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ commitBuilder
+ .addWatermarkHoldsBuilder()
+ .setTag(stateKey.byteString())
+ .setStateFamily(stateFamily)
+ .setReset(true);
- cachedValue = Optional.of(localAdditions);
+ result = Futures.immediateFuture(commitBuilder.buildPartial());
+ } else {
+ // 4. Since we cleared before adding, we can do an overwrite of
persisted state
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ commitBuilder
+ .addWatermarkHoldsBuilder()
+ .setTag(stateKey.byteString())
+ .setStateFamily(stateFamily)
+ .setReset(true)
+
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions));
- result = Futures.immediateFuture(commitBuilder.buildPartial());
- } else if (!cleared && localAdditions != null) {
- // Otherwise, we need to combine the local additions with the already
persisted data
+ cachedValue = Optional.of(localAdditions);
+ result = Futures.immediateFuture(commitBuilder.buildPartial());
+ }
+ } else if (localAdditions != null) {
+ // 5. Otherwise, we need to combine the local additions with the already
persisted data
result = combineWithPersisted();
} else {
- throw new IllegalStateException("Unreachable condition");
+ // No changes, so no need to update Windmill and no need to cache any
value.
+ return
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
}
final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size();
+
return Futures.lazyTransform(
result,
result1 -> {
cleared = false;
+ knownEmpty = false;
localAdditions = null;
if (cachedValue != null) {
cache.put(namespace, stateKey, WindmillWatermarkHold.this,
estimatedByteSize);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index 5fbae493581..87b746089f1 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -3025,18 +3025,175 @@ public class WindmillStateInternalsTest {
StateTag<WatermarkHoldState> addr =
StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
- WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
+ WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
- bag.clear();
- assertThat(bag.read(), Matchers.nullValue());
+ hold.clear();
+ assertThat(hold.read(), Matchers.nullValue());
- bag.add(new Instant(300));
- assertThat(bag.read(), Matchers.equalTo(new Instant(300)));
+ hold.add(new Instant(300));
+ assertThat(hold.read(), Matchers.equalTo(new Instant(300)));
// Shouldn't need to read from windmill because the value is already
available.
Mockito.verifyNoMoreInteractions(mockReader);
}
+ @Test
+ public void testWatermarkSetKnownEmptyBeforeRead() throws Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+ hold.setKnownEmpty();
+ assertThat(hold.read(), Matchers.nullValue());
+
+ hold.add(new Instant(300));
+ assertThat(hold.read(), Matchers.equalTo(new Instant(300)));
+
+ // Shouldn't need to read from windmill because the value is already
available.
+ Mockito.verifyNoMoreInteractions(mockReader);
+ }
+
+ @Test
+ public void testWatermarkSetKnownEmptyThenAddPersist() throws Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+ hold.setKnownEmpty();
+ hold.add(new Instant(1000));
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+ Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+ assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+ assertEquals(TimeUnit.MILLISECONDS.toMicros(1000),
watermarkHold.getTimestamps(0));
+
+ Mockito.verifyNoInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
+ }
+
+ @Test
+ public void testNewWatermarkAddPersist() throws Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr);
+
+ hold.add(new Instant(1000));
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTestNewKey.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+ Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+ assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+ assertEquals(TimeUnit.MILLISECONDS.toMicros(1000),
watermarkHold.getTimestamps(0));
+
+ Mockito.verifyNoInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
+ }
+
+ @Test
+ public void testNewWatermarkClearPersist() throws Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr);
+
+ hold.add(new Instant(1000));
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTestNewKey.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+ Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+ assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+ assertEquals(TimeUnit.MILLISECONDS.toMicros(1000),
watermarkHold.getTimestamps(0));
+
+ Mockito.verifyNoInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
+ }
+
+ @Test
+ public void testWatermarkSetKnownEmptyThenClearPersist() throws Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+ hold.setKnownEmpty();
+ hold.clear();
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(0, commitBuilder.getWatermarkHoldsCount());
+
+ Mockito.verifyNoInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
+ }
+
+ @Test
+ public void testWatermarkSetKnownEmptyThenPersist() throws Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+ hold.setKnownEmpty();
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(0, commitBuilder.getWatermarkHoldsCount());
+
+ Mockito.verifyNoInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
+ }
+
+ @Test
+ public void testWatermarkSetKnownEmptyThenClearThenAddPersist() throws
Exception {
+ StateTag<WatermarkHoldState> addr =
+ StateTags.watermarkStateInternal("watermark",
TimestampCombiner.EARLIEST);
+
+ WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+ hold.setKnownEmpty();
+ hold.clear();
+ hold.add(new Instant(1000));
+
+ Windmill.WorkItemCommitRequest.Builder commitBuilder =
+ Windmill.WorkItemCommitRequest.newBuilder();
+ underTest.persist(commitBuilder);
+
+ assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+ Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+ assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+ assertEquals(TimeUnit.MILLISECONDS.toMicros(1000),
watermarkHold.getTimestamps(0));
+
+ Mockito.verifyNoInteractions(mockReader);
+
+ assertBuildable(commitBuilder);
+ }
+
@Test
public void testWatermarkPersistEarliest() throws Exception {
StateTag<WatermarkHoldState> addr =
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
index 6d4183da101..f8b09bcf03a 100644
---
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
@@ -38,4 +38,14 @@ public interface WatermarkHoldState extends
GroupingState<Instant, Instant> {
@Override
WatermarkHoldState readLater();
+
+ /**
+ * <b><i>For internal use only; no backwards-compatibility
guarantees.</i></b>
+ *
+ * <p>Permit marking the state as empty locally, without necessarily
clearing it in the backend.
+ *
+ * <p>This may be used by runners to optimize out unnecessary state reads.
+ */
+ @Internal
+ default void setKnownEmpty() {}
}