This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new f22400a21a9 [Dataflow Streaming] Add experiment to use trigger state 
to know if a window is new and optimize reads (#37574)
f22400a21a9 is described below

commit f22400a21a9a7367dc53f5ee6d7a6c33ee8f6ea2
Author: Arun Pandian <[email protected]>
AuthorDate: Wed May 20 05:48:56 2026 -0700

    [Dataflow Streaming] Add experiment to use trigger state to know if a 
window is new and optimize reads (#37574)
    
    If unstable_not_update_compatible_new_window_optimization experiment is set 
always write trigger state so it's absence can be used to know that the window 
has never existed to avoid reads.  This is only performed for non-merging 
windows.
    
    Additionally optimize for all cases that trigger state is not rewritten if 
it is unchanged. This logic already existed but was broken due to missing 
equals method implementation.
---
 .../apache/beam/runners/core/ReduceFnRunner.java   |  49 +++++-
 .../apache/beam/runners/core/WatermarkHold.java    |  17 +++
 .../core/triggers/FinishedTriggersBitSet.java      |  14 ++
 .../core/triggers/TriggerStateMachineRunner.java   |  43 ++++--
 .../beam/runners/core/ReduceFnRunnerTest.java      |  58 +++++++
 .../apache/beam/runners/core/ReduceFnTester.java   |  13 ++
 .../triggers/TriggerStateMachineRunnerTest.java    | 117 +++++++++++++++
 .../windmill/state/WindmillWatermarkHold.java      |  94 ++++++++----
 .../windmill/state/WindmillStateInternalsTest.java | 167 ++++++++++++++++++++-
 .../apache/beam/sdk/state/WatermarkHoldState.java  |  10 ++
 10 files changed, 535 insertions(+), 47 deletions(-)

diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
index 1ae0c52f853..78505f3c65f 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnRunner.java
@@ -37,6 +37,7 @@ import 
org.apache.beam.runners.core.triggers.TriggerStateMachineContextFactory;
 import org.apache.beam.runners.core.triggers.TriggerStateMachineRunner;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.CombiningState;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -94,6 +95,11 @@ import org.joda.time.Instant;
 }) // TODO(https://github.com/apache/beam/issues/20497)
 public class ReduceFnRunner<K, InputT, OutputT, W extends BoundedWindow> {
 
+  // Experiments guarding optimizations in development. No backward 
compatibility guarantees.
+  public static final String 
UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION =
+      "unstable_not_update_compatible_new_window_optimization";
+  public static final String 
UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION =
+      "unstable_disable_watermark_known_empty_optimization";
   /**
    * The {@link ReduceFnRunner} depends on most aspects of the {@link 
WindowingStrategy}.
    *
@@ -218,6 +224,9 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
    */
   private final NonEmptyPanes<K, W> nonEmptyPanes;
 
+  private final boolean useNewWindowOptimization;
+  private final boolean disableWatermarkKnownEmptyOptimization;
+
   public ReduceFnRunner(
       K key,
       WindowingStrategy<?, W> windowingStrategy,
@@ -244,6 +253,15 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
     this.nonEmptyPanes = NonEmptyPanes.create(this.windowingStrategy, 
this.reduceFn);
 
+    this.useNewWindowOptimization =
+        windowingStrategy.getWindowFn().isNonMerging()
+            && ExperimentalOptions.hasExperiment(
+                options, 
UNSTABLE_NOT_UPDATE_COMPATIBLE_NEW_WINDOW_OPTIMIZATION);
+
+    this.disableWatermarkKnownEmptyOptimization =
+        ExperimentalOptions.hasExperiment(
+            options, UNSTABLE_DISABLE_WATERMARK_KNOWN_EMPTY_OPTIMIZATION);
+
     // Note this may incur I/O to load persisted window set data.
     this.activeWindows = createActiveWindowSet();
 
@@ -263,7 +281,8 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
         new TriggerStateMachineRunner<>(
             triggerStateMachine,
             new TriggerStateMachineContextFactory<>(
-                windowingStrategy.getWindowFn(), stateInternals, 
activeWindows));
+                windowingStrategy.getWindowFn(), stateInternals, 
activeWindows),
+            this.useNewWindowOptimization);
   }
 
   private ActiveWindowSet<W> createActiveWindowSet() {
@@ -282,6 +301,16 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
     return activeWindows.getActiveAndNewWindows().isEmpty();
   }
 
+  @VisibleForTesting
+  TriggerStateMachineRunner<W> getTriggerRunner() {
+    return triggerRunner;
+  }
+
+  @VisibleForTesting
+  ReduceFnContextFactory<K, InputT, OutputT, W> getContextFactory() {
+    return contextFactory;
+  }
+
   private Set<W> windowsThatAreOpen(Collection<W> windows) {
     Set<W> result = new HashSet<>();
     for (W window : windows) {
@@ -626,6 +655,20 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
               StateStyle.RENAMED,
               value.causedByDrain());
 
+      // TODO: Make sure the NewWindowOptimization does not create unbounded 
trigger state
+      // in GlobalWindow
+      if (useNewWindowOptimization && 
triggerRunner.isNew(directContext.state())) {
+        // Blindly clear state to ensure Windmill doesn't do unnecessary reads.
+        // TODO: Instead of the clears here, we could mark these states as 
empty locally
+        //  in the state cache and/or explicitly tell that the entries are 
non-existent via api
+        reduceFn.clearState(renamedContext);
+        paneInfoTracker.clear(directContext.state());
+        if (!disableWatermarkKnownEmptyOptimization) {
+          watermarkHold.setKnownEmpty(renamedContext);
+        }
+        nonEmptyPanes.clearPane(renamedContext.state());
+      }
+
       nonEmptyPanes.recordContent(renamedContext.state());
       scheduleGarbageCollectionTimer(directContext);
 
@@ -761,7 +804,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
 
       // Perform prefetching of state to determine if the trigger should fire.
       if (windowActivation.isGarbageCollection) {
-        triggerRunner.prefetchIsClosed(directContext.state());
+        triggerRunner.prefetchFinishedSet(directContext.state());
       } else {
         triggerRunner.prefetchShouldFire(directContext.window(), 
directContext.state());
       }
@@ -941,7 +984,7 @@ public class ReduceFnRunner<K, InputT, OutputT, W extends 
BoundedWindow> {
       ReduceFn<K, InputT, OutputT, W>.Context directContext,
       ReduceFn<K, InputT, OutputT, W>.Context renamedContext) {
     triggerRunner.prefetchShouldFire(directContext.window(), 
directContext.state());
-    triggerRunner.prefetchIsClosed(directContext.state());
+    triggerRunner.prefetchFinishedSet(directContext.state());
     prefetchOnTrigger(directContext, renamedContext);
   }
 
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
index 15ae8dfe5f1..b9185ccfba3 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/WatermarkHold.java
@@ -466,6 +466,23 @@ class WatermarkHold<W extends BoundedWindow> implements 
Serializable {
     context.state().access(EXTRA_HOLD_TAG).clear();
   }
 
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>Permit marking the watermark holds as empty locally, without 
necessarily clearing them in
+   * the backend.
+   */
+  public void setKnownEmpty(ReduceFn<?, ?, ?, W>.Context context) {
+    WindowTracing.debug(
+        "WatermarkHold.setKnownEmpty: For key:{}; window:{}; 
inputWatermark:{}; outputWatermark:{}",
+        context.key(),
+        context.window(),
+        timerInternals.currentInputWatermarkTime(),
+        timerInternals.currentOutputWatermarkTime());
+    context.state().access(elementHoldTag).setKnownEmpty();
+    context.state().access(EXTRA_HOLD_TAG).setKnownEmpty();
+  }
+
   /** Return the current data hold, or null if none. Does not clear. For 
debugging only. */
   public @Nullable Instant getDataCurrent(ReduceFn<?, ?, ?, W>.Context 
context) {
     return context.state().access(elementHoldTag).read();
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
index 7eebb4474c6..967e1ef43f0 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/FinishedTriggersBitSet.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.core.triggers;
 
 import java.util.BitSet;
+import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** A {@link FinishedTriggers} implementation based on an underlying {@link 
BitSet}. */
 public class FinishedTriggersBitSet implements FinishedTriggers {
@@ -60,4 +61,17 @@ public class FinishedTriggersBitSet implements 
FinishedTriggers {
   public FinishedTriggersBitSet copy() {
     return new FinishedTriggersBitSet((BitSet) bitSet.clone());
   }
+
+  @Override
+  public boolean equals(@Nullable Object obj) {
+    if (!(obj instanceof FinishedTriggersBitSet)) {
+      return false;
+    }
+    return bitSet.equals(((FinishedTriggersBitSet) obj).bitSet);
+  }
+
+  @Override
+  public int hashCode() {
+    return bitSet.hashCode();
+  }
 }
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
index e3791821b72..90daa5ac75b 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunner.java
@@ -63,13 +63,16 @@ public class TriggerStateMachineRunner<W extends 
BoundedWindow> {
 
   private final ExecutableTriggerStateMachine rootTrigger;
   private final TriggerStateMachineContextFactory<W> contextFactory;
+  private final boolean useNewWindowOptimization;
 
   public TriggerStateMachineRunner(
       ExecutableTriggerStateMachine rootTrigger,
-      TriggerStateMachineContextFactory<W> contextFactory) {
+      TriggerStateMachineContextFactory<W> contextFactory,
+      boolean useNewWindowOptimization) {
     checkState(rootTrigger.getTriggerIndex() == 0);
     this.rootTrigger = rootTrigger;
     this.contextFactory = contextFactory;
+    this.useNewWindowOptimization = useNewWindowOptimization;
   }
 
   private FinishedTriggersBitSet readFinishedBits(ValueState<BitSet> state) {
@@ -81,9 +84,11 @@ public class TriggerStateMachineRunner<W extends 
BoundedWindow> {
     }
 
     @Nullable BitSet bitSet = state.read();
-    return bitSet == null
-        ? 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree())
-        : FinishedTriggersBitSet.fromBitSet(bitSet);
+    if (bitSet == null) {
+      return 
FinishedTriggersBitSet.emptyWithCapacity(rootTrigger.getFirstIndexAfterSubtree());
+    }
+
+    return FinishedTriggersBitSet.fromBitSet(bitSet);
   }
 
   private void clearFinishedBits(ValueState<BitSet> state) {
@@ -99,19 +104,29 @@ public class TriggerStateMachineRunner<W extends 
BoundedWindow> {
     return 
readFinishedBits(state.access(FINISHED_BITS_TAG)).isFinished(rootTrigger);
   }
 
-  public void prefetchIsClosed(StateAccessor<?> state) {
+  /** Return true if the window is new (no trigger state has ever been 
persisted). */
+  public boolean isNew(StateAccessor<?> state) {
+    return isFinishedSetNeeded() && state.access(FINISHED_BITS_TAG).read() == 
null;
+  }
+
+  @VisibleForTesting
+  public BitSet getFinishedBits(StateAccessor<?> state) {
+    return readFinishedBits(state.access(FINISHED_BITS_TAG)).getBitSet();
+  }
+
+  public void prefetchFinishedSet(StateAccessor<?> state) {
     if (isFinishedSetNeeded()) {
       state.access(FINISHED_BITS_TAG).readLater();
     }
   }
 
   public void prefetchForValue(W window, StateAccessor<?> state) {
-    prefetchIsClosed(state);
+    prefetchFinishedSet(state);
     
rootTrigger.invokePrefetchOnElement(contextFactory.createPrefetchContext(window,
 rootTrigger));
   }
 
   public void prefetchShouldFire(W window, StateAccessor<?> state) {
-    prefetchIsClosed(state);
+    prefetchFinishedSet(state);
     
rootTrigger.invokePrefetchShouldFire(contextFactory.createPrefetchContext(window,
 rootTrigger));
   }
 
@@ -180,13 +195,23 @@ public class TriggerStateMachineRunner<W extends 
BoundedWindow> {
     persistFinishedSet(state, finishedSet);
   }
 
-  private void persistFinishedSet(
-      StateAccessor<?> state, FinishedTriggersBitSet modifiedFinishedSet) {
+  @VisibleForTesting
+  void persistFinishedSet(StateAccessor<?> state, FinishedTriggersBitSet 
modifiedFinishedSet) {
     if (!isFinishedSetNeeded()) {
       return;
     }
 
     ValueState<BitSet> finishedSetState = state.access(FINISHED_BITS_TAG);
+
+    if (useNewWindowOptimization) {
+      if (finishedSetState.read() == null
+          || !readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
+        // Write a value even if the bitset was empty
+        finishedSetState.write(modifiedFinishedSet.getBitSet());
+      }
+      return;
+    }
+
     if (!readFinishedBits(finishedSetState).equals(modifiedFinishedSet)) {
       if (modifiedFinishedSet.getBitSet().isEmpty()) {
         finishedSetState.clear();
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 85f6573be23..72b379a7c71 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -40,15 +40,19 @@ import static org.mockito.Mockito.withSettings;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.BitSet;
+import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 import java.util.concurrent.ThreadLocalRandom;
+import org.apache.beam.runners.core.ReduceFnContextFactory.StateStyle;
 import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
 import org.apache.beam.runners.core.triggers.DefaultTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.metrics.MetricName;
 import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.options.ExperimentalOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.state.TimeDomain;
@@ -2343,4 +2347,58 @@ public class ReduceFnRunnerTest {
 
     void setValue(int value);
   }
+
+  @Test
+  public void testNewWindowOptimization() throws Exception {
+    WindowingStrategy<?, IntervalWindow> strategy =
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(10)))
+            .withTrigger(AfterPane.elementCountAtLeast(2))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+    PipelineOptions options = PipelineOptionsFactory.create();
+    options
+        .as(ExperimentalOptions.class)
+        .setExperiments(
+            
Collections.singletonList("unstable_not_update_compatible_new_window_optimization"));
+    ReduceFnTester<Integer, Iterable<Integer>, IntervalWindow> tester =
+        ReduceFnTester.nonCombining(strategy, options);
+
+    IntervalWindow window = new IntervalWindow(new Instant(0), new 
Instant(10));
+
+    assertTrue(
+        "Window should be new",
+        tester
+            .createRunner()
+            .getTriggerRunner()
+            .isNew(
+                tester.createRunner().getContextFactory().base(window, 
StateStyle.DIRECT).state()));
+
+    // 1. First element for a new window.
+    tester.injectElements(TimestampedValue.of(1, new Instant(1)));
+
+    BitSet bitSet =
+        tester
+            .createRunner()
+            .getTriggerRunner()
+            .getFinishedBits(
+                tester.createRunner().getContextFactory().base(window, 
StateStyle.DIRECT).state());
+    assertTrue("Bitset should be empty", bitSet.isEmpty());
+    assertFalse("Trigger should not be finished", bitSet.get(0));
+
+    assertFalse(
+        "Window should no longer be new",
+        tester
+            .createRunner()
+            .getTriggerRunner()
+            .isNew(
+                tester.createRunner().getContextFactory().base(window, 
StateStyle.DIRECT).state()));
+
+    // 2. Second element for the same window.
+    tester.injectElements(TimestampedValue.of(2, new Instant(2)));
+
+    // Extract output.
+    List<WindowedValue<Iterable<Integer>>> output = tester.extractOutput();
+    // 2 elements fired at end of window.
+    assertThat(output, contains(isSingleWindowedValue(containsInAnyOrder(1, 
2), 9, 0, 10)));
+  }
 }
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
index 2326a1c77d3..fe2c0a25346 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnTester.java
@@ -129,6 +129,19 @@ public class ReduceFnTester<InputT, OutputT, W extends 
BoundedWindow> {
         NullSideInputReader.empty());
   }
 
+  public static <W extends BoundedWindow>
+      ReduceFnTester<Integer, Iterable<Integer>, W> nonCombining(
+          WindowingStrategy<?, W> windowingStrategy, PipelineOptions options) 
throws Exception {
+    return new ReduceFnTester<>(
+        windowingStrategy,
+        TriggerStateMachines.stateMachineForTrigger(
+            TriggerTranslation.toProto(windowingStrategy.getTrigger())),
+        SystemReduceFn.buffering(VarIntCoder.of()),
+        IterableCoder.of(VarIntCoder.of()),
+        options,
+        NullSideInputReader.empty());
+  }
+
   /**
    * Creates a {@link ReduceFnTester} for the given {@link WindowingStrategy} 
and {@link
    * TriggerStateMachine}, for mocking the interactions between {@link 
ReduceFnRunner} and the
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java
new file mode 100644
index 00000000000..81c80753275
--- /dev/null
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/triggers/TriggerStateMachineRunnerTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core.triggers;
+
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.BitSet;
+import org.apache.beam.runners.core.StateAccessor;
+import org.apache.beam.sdk.state.ValueState;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+@RunWith(JUnit4.class)
+public class TriggerStateMachineRunnerTest {
+
+  @Mock private StateAccessor<?> mockState;
+  @Mock private ValueState<BitSet> mockFinishedSetState;
+  @Mock private TriggerStateMachineContextFactory<?> mockContextFactory;
+  @Mock private TriggerStateMachine mockTriggerStateMachine;
+
+  private ExecutableTriggerStateMachine rootTrigger;
+
+  @Before
+  public void setUp() {
+    MockitoAnnotations.initMocks(this);
+    when(mockState.access(TriggerStateMachineRunner.FINISHED_BITS_TAG))
+        .thenReturn((ValueState) mockFinishedSetState);
+    rootTrigger = 
ExecutableTriggerStateMachine.create(mockTriggerStateMachine);
+  }
+
+  @Test
+  public void testPersistFinishedSet_emptyAndOptimizationEnabled() throws 
Exception {
+    when(mockFinishedSetState.read()).thenReturn(null);
+
+    TriggerStateMachineRunner<?> runner =
+        new TriggerStateMachineRunner<>(
+            rootTrigger,
+            (TriggerStateMachineContextFactory) mockContextFactory,
+            true /* useNewWindowOptimization */);
+
+    FinishedTriggersBitSet modifiedFinishedSet = 
FinishedTriggersBitSet.emptyWithCapacity(1);
+
+    runner.persistFinishedSet(mockState, modifiedFinishedSet);
+
+    // Should write empty bitset because optimization is enabled
+    verify(mockFinishedSetState).write(modifiedFinishedSet.getBitSet());
+  }
+
+  @Test
+  public void testPersistFinishedSet_emptyAndOptimizationDisabled() throws 
Exception {
+    when(mockFinishedSetState.read()).thenReturn(null);
+
+    TriggerStateMachineRunner<?> runner =
+        new TriggerStateMachineRunner<>(
+            rootTrigger,
+            (TriggerStateMachineContextFactory) mockContextFactory,
+            false /* useNewWindowOptimization */);
+
+    FinishedTriggersBitSet modifiedFinishedSet = 
FinishedTriggersBitSet.emptyWithCapacity(1);
+
+    runner.persistFinishedSet(mockState, modifiedFinishedSet);
+
+    // Should NOT write empty bitset because optimization is disabled and it 
was already empty (read
+    // returned null)
+    verify(mockFinishedSetState, 
never()).write(modifiedFinishedSet.getBitSet());
+  }
+
+  private void runTestPersistFinishedSet_nonEmpty(boolean 
useNewWindowOptimization)
+      throws Exception {
+    when(mockFinishedSetState.read()).thenReturn(null);
+
+    TriggerStateMachineRunner<?> runner =
+        new TriggerStateMachineRunner<>(
+            rootTrigger,
+            (TriggerStateMachineContextFactory) mockContextFactory,
+            useNewWindowOptimization);
+
+    FinishedTriggersBitSet modifiedFinishedSet = 
FinishedTriggersBitSet.emptyWithCapacity(1);
+    modifiedFinishedSet.setFinished(rootTrigger, true);
+
+    runner.persistFinishedSet(mockState, modifiedFinishedSet);
+
+    // Should write non-empty bitset
+    verify(mockFinishedSetState).write(modifiedFinishedSet.getBitSet());
+  }
+
+  @Test
+  public void testPersistFinishedSet_nonEmpty() throws Exception {
+    runTestPersistFinishedSet_nonEmpty(false);
+  }
+
+  @Test
+  public void testPersistFinishedSet_nonEmptyAndOptimizationEnabled() throws 
Exception {
+    runTestPersistFinishedSet_nonEmpty(true);
+  }
+}
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
index b426b96cb5b..843a7347bdc 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillWatermarkHold.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.dataflow.worker.windmill.state;
 
+import static 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ExecutionException;
@@ -37,6 +39,7 @@ import org.joda.time.Instant;
   "nullness" // TODO(https://github.com/apache/beam/issues/20497)
 })
 public class WindmillWatermarkHold extends WindmillState implements 
WatermarkHoldState {
+
   // The encoded size of an Instant.
   private static final int ENCODED_SIZE = 8;
 
@@ -46,6 +49,7 @@ public class WindmillWatermarkHold extends WindmillState 
implements WatermarkHol
   private final String stateFamily;
 
   private boolean cleared = false;
+  private boolean knownEmpty = false;
   /**
    * If non-{@literal null}, the known current hold value, or absent if we 
know there are no output
    * watermark holds. If {@literal null}, the current hold value could depend 
on holds in Windmill
@@ -81,6 +85,17 @@ public class WindmillWatermarkHold extends WindmillState 
implements WatermarkHol
     cachedValue = Optional.absent();
   }
 
+  @Override
+  public void setKnownEmpty() {
+    checkState(localAdditions == null, "setKnownEmpty called with local 
additions");
+    checkState(!cleared, "setKnownEmpty called after clearing");
+    checkState(
+        cachedValue == null || !cachedValue.isPresent(),
+        "setKnownEmpty called with a cached value");
+    cachedValue = Optional.absent();
+    knownEmpty = true;
+  }
+
   @Override
   @SuppressWarnings("FutureReturnValueIgnored")
   public WindmillWatermarkHold readLater() {
@@ -137,48 +152,67 @@ public class WindmillWatermarkHold extends WindmillState 
implements WatermarkHol
 
     Future<Windmill.WorkItemCommitRequest> result;
 
-    if (!cleared && localAdditions == null) {
-      // No changes, so no need to update Windmill and no need to cache any 
value.
-      return 
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
-    }
-
-    if (cleared && localAdditions == null) {
-      // Just clearing the persisted state; blind delete
-      Windmill.WorkItemCommitRequest.Builder commitBuilder =
-          Windmill.WorkItemCommitRequest.newBuilder();
-      commitBuilder
-          .addWatermarkHoldsBuilder()
-          .setTag(stateKey.byteString())
-          .setStateFamily(stateFamily)
-          .setReset(true);
+    if (knownEmpty) {
+      if (localAdditions != null) {
+        // 1. We know it's empty, so we can just update with localAdditions
+        Windmill.WorkItemCommitRequest.Builder commitBuilder =
+            Windmill.WorkItemCommitRequest.newBuilder();
+        commitBuilder
+            .addWatermarkHoldsBuilder()
+            .setTag(stateKey.byteString())
+            .setStateFamily(stateFamily)
+            
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions));
 
-      result = Futures.immediateFuture(commitBuilder.buildPartial());
-    } else if (cleared && localAdditions != null) {
-      // Since we cleared before adding, we can do a blind overwrite of 
persisted state
-      Windmill.WorkItemCommitRequest.Builder commitBuilder =
-          Windmill.WorkItemCommitRequest.newBuilder();
-      commitBuilder
-          .addWatermarkHoldsBuilder()
-          .setTag(stateKey.byteString())
-          .setStateFamily(stateFamily)
-          .setReset(true)
-          
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions));
+        cachedValue = Optional.of(localAdditions);
+        result = Futures.immediateFuture(commitBuilder.buildPartial());
+      } else {
+        // 2. State is known to be empty and there are no local additions.
+        // Whether 'cleared' was called or not, the desired state is empty.
+        // So no need to update Windmill.
+        result =
+            
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
+      }
+    } else if (cleared) {
+      if (localAdditions == null) {
+        // 3. Just clearing the persisted state; blind delete
+        Windmill.WorkItemCommitRequest.Builder commitBuilder =
+            Windmill.WorkItemCommitRequest.newBuilder();
+        commitBuilder
+            .addWatermarkHoldsBuilder()
+            .setTag(stateKey.byteString())
+            .setStateFamily(stateFamily)
+            .setReset(true);
 
-      cachedValue = Optional.of(localAdditions);
+        result = Futures.immediateFuture(commitBuilder.buildPartial());
+      } else {
+        // 4. Since we cleared before adding, we can do an overwrite of 
persisted state
+        Windmill.WorkItemCommitRequest.Builder commitBuilder =
+            Windmill.WorkItemCommitRequest.newBuilder();
+        commitBuilder
+            .addWatermarkHoldsBuilder()
+            .setTag(stateKey.byteString())
+            .setStateFamily(stateFamily)
+            .setReset(true)
+            
.addTimestamps(WindmillTimeUtils.harnessToWindmillTimestamp(localAdditions));
 
-      result = Futures.immediateFuture(commitBuilder.buildPartial());
-    } else if (!cleared && localAdditions != null) {
-      // Otherwise, we need to combine the local additions with the already 
persisted data
+        cachedValue = Optional.of(localAdditions);
+        result = Futures.immediateFuture(commitBuilder.buildPartial());
+      }
+    } else if (localAdditions != null) {
+      // 5. Otherwise, we need to combine the local additions with the already 
persisted data
       result = combineWithPersisted();
     } else {
-      throw new IllegalStateException("Unreachable condition");
+      // No changes, so no need to update Windmill and no need to cache any 
value.
+      return 
Futures.immediateFuture(Windmill.WorkItemCommitRequest.newBuilder().buildPartial());
     }
 
     final int estimatedByteSize = ENCODED_SIZE + stateKey.byteString().size();
+
     return Futures.lazyTransform(
         result,
         result1 -> {
           cleared = false;
+          knownEmpty = false;
           localAdditions = null;
           if (cachedValue != null) {
             cache.put(namespace, stateKey, WindmillWatermarkHold.this, 
estimatedByteSize);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
index 5fbae493581..87b746089f1 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/state/WindmillStateInternalsTest.java
@@ -3025,18 +3025,175 @@ public class WindmillStateInternalsTest {
     StateTag<WatermarkHoldState> addr =
         StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
 
-    WatermarkHoldState bag = underTest.state(NAMESPACE, addr);
+    WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
 
-    bag.clear();
-    assertThat(bag.read(), Matchers.nullValue());
+    hold.clear();
+    assertThat(hold.read(), Matchers.nullValue());
 
-    bag.add(new Instant(300));
-    assertThat(bag.read(), Matchers.equalTo(new Instant(300)));
+    hold.add(new Instant(300));
+    assertThat(hold.read(), Matchers.equalTo(new Instant(300)));
 
     // Shouldn't need to read from windmill because the value is already 
available.
     Mockito.verifyNoMoreInteractions(mockReader);
   }
 
+  @Test
+  public void testWatermarkSetKnownEmptyBeforeRead() throws Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+    hold.setKnownEmpty();
+    assertThat(hold.read(), Matchers.nullValue());
+
+    hold.add(new Instant(300));
+    assertThat(hold.read(), Matchers.equalTo(new Instant(300)));
+
+    // Shouldn't need to read from windmill because the value is already 
available.
+    Mockito.verifyNoMoreInteractions(mockReader);
+  }
+
+  @Test
+  public void testWatermarkSetKnownEmptyThenAddPersist() throws Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+    hold.setKnownEmpty();
+    hold.add(new Instant(1000));
+
+    Windmill.WorkItemCommitRequest.Builder commitBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder();
+    underTest.persist(commitBuilder);
+
+    assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+    Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+    assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+    assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), 
watermarkHold.getTimestamps(0));
+
+    Mockito.verifyNoInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
+  }
+
+  @Test
+  public void testNewWatermarkAddPersist() throws Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr);
+
+    hold.add(new Instant(1000));
+
+    Windmill.WorkItemCommitRequest.Builder commitBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder();
+    underTestNewKey.persist(commitBuilder);
+
+    assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+    Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+    assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+    assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), 
watermarkHold.getTimestamps(0));
+
+    Mockito.verifyNoInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
+  }
+
+  @Test
+  public void testNewWatermarkClearPersist() throws Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTestNewKey.state(NAMESPACE, addr);
+
+    hold.add(new Instant(1000));
+
+    Windmill.WorkItemCommitRequest.Builder commitBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder();
+    underTestNewKey.persist(commitBuilder);
+
+    assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+    Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+    assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+    assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), 
watermarkHold.getTimestamps(0));
+
+    Mockito.verifyNoInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
+  }
+
+  @Test
+  public void testWatermarkSetKnownEmptyThenClearPersist() throws Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+    hold.setKnownEmpty();
+    hold.clear();
+
+    Windmill.WorkItemCommitRequest.Builder commitBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder();
+    underTest.persist(commitBuilder);
+
+    assertEquals(0, commitBuilder.getWatermarkHoldsCount());
+
+    Mockito.verifyNoInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
+  }
+
+  @Test
+  public void testWatermarkSetKnownEmptyThenPersist() throws Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+    hold.setKnownEmpty();
+
+    Windmill.WorkItemCommitRequest.Builder commitBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder();
+    underTest.persist(commitBuilder);
+
+    assertEquals(0, commitBuilder.getWatermarkHoldsCount());
+
+    Mockito.verifyNoInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
+  }
+
+  @Test
+  public void testWatermarkSetKnownEmptyThenClearThenAddPersist() throws 
Exception {
+    StateTag<WatermarkHoldState> addr =
+        StateTags.watermarkStateInternal("watermark", 
TimestampCombiner.EARLIEST);
+
+    WatermarkHoldState hold = underTest.state(NAMESPACE, addr);
+
+    hold.setKnownEmpty();
+    hold.clear();
+    hold.add(new Instant(1000));
+
+    Windmill.WorkItemCommitRequest.Builder commitBuilder =
+        Windmill.WorkItemCommitRequest.newBuilder();
+    underTest.persist(commitBuilder);
+
+    assertEquals(1, commitBuilder.getWatermarkHoldsCount());
+
+    Windmill.WatermarkHold watermarkHold = commitBuilder.getWatermarkHolds(0);
+    assertEquals(key(NAMESPACE, "watermark"), watermarkHold.getTag());
+    assertEquals(TimeUnit.MILLISECONDS.toMicros(1000), 
watermarkHold.getTimestamps(0));
+
+    Mockito.verifyNoInteractions(mockReader);
+
+    assertBuildable(commitBuilder);
+  }
+
   @Test
   public void testWatermarkPersistEarliest() throws Exception {
     StateTag<WatermarkHoldState> addr =
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
index 6d4183da101..f8b09bcf03a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/state/WatermarkHoldState.java
@@ -38,4 +38,14 @@ public interface WatermarkHoldState extends 
GroupingState<Instant, Instant> {
 
   @Override
   WatermarkHoldState readLater();
+
+  /**
+   * <b><i>For internal use only; no backwards-compatibility 
guarantees.</i></b>
+   *
+   * <p>Permit marking the state as empty locally, without necessarily 
clearing it in the backend.
+   *
+   * <p>This may be used by runners to optimize out unnecessary state reads.
+   */
+  @Internal
+  default void setKnownEmpty() {}
 }


Reply via email to