This is an automated email from the ASF dual-hosted git repository.

kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 81bb5066bab [Java] Dataflow runner v1 - Propagate drain mode (#36534)
81bb5066bab is described below

commit 81bb5066bab883f79328e52f0d2a55e9b90f2f65
Author: RadosÅ‚aw Stankiewicz <[email protected]>
AuthorDate: Tue Dec 2 17:54:19 2025 +0100

    [Java] Dataflow runner v1 - Propagate drain mode (#36534)
---
 .../dataflow/worker/StreamingDataflowWorker.java   |  2 +
 .../worker/StreamingModeExecutionContext.java      | 14 ++++-
 .../dataflow/worker/UngroupedWindmillReader.java   | 25 ++++++--
 .../dataflow/worker/WindmillKeyedWorkItem.java     | 26 ++++++--
 .../dataflow/worker/WindmillTimerInternals.java    |  7 ++-
 .../dataflow/worker/WindowingWindmillReader.java   |  3 +-
 .../runners/dataflow/worker/streaming/Work.java    | 11 +++-
 .../harness/SingleSourceWorkerHarness.java         |  3 +
 .../client/grpc/GetWorkResponseChunkAssembler.java |  5 +-
 .../client/grpc/GrpcDirectGetWorkStream.java       |  1 +
 .../windmill/client/grpc/GrpcGetWorkStream.java    |  1 +
 .../worker/windmill/work/WorkItemReceiver.java     |  1 +
 .../worker/windmill/work/WorkItemScheduler.java    |  2 +
 .../work/processing/StreamingWorkScheduler.java    |  4 +-
 .../dataflow/worker/FakeWindmillServer.java        |  1 +
 .../worker/StreamingDataflowWorkerTest.java        |  2 +
 .../worker/StreamingGroupAlsoByWindowFnsTest.java  |  2 +-
 ...reamingGroupAlsoByWindowsReshuffleDoFnTest.java |  2 +-
 .../worker/StreamingModeExecutionContextTest.java  |  1 +
 .../dataflow/worker/WindmillKeyedWorkItemTest.java | 72 +++++++++++++++++++++-
 .../worker/WindmillTimerInternalsTest.java         |  6 +-
 .../dataflow/worker/WorkerCustomSourcesTest.java   |  2 +
 .../worker/streaming/ActiveWorkStateTest.java      |  2 +
 .../streaming/ComputationStateCacheTest.java       |  1 +
 .../FanOutStreamingEngineWorkerHarnessTest.java    |  1 +
 .../harness/WindmillStreamSenderTest.java          |  1 +
 .../worker/util/BoundedQueueExecutorTest.java      |  1 +
 .../StreamingApplianceWorkCommitterTest.java       |  1 +
 .../commits/StreamingEngineWorkCommitterTest.java  |  1 +
 .../client/grpc/GrpcDirectGetWorkStreamTest.java   | 12 +++-
 .../client/grpc/GrpcWindmillServerTest.java        |  6 +-
 .../failures/WorkFailureProcessorTest.java         |  1 +
 .../work/refresh/ActiveWorkRefresherTest.java      |  1 +
 33 files changed, 195 insertions(+), 26 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 83e924514b5..aad27b86986 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -389,6 +389,7 @@ public final class StreamingDataflowWorker {
                 serializedWorkItemSize,
                 watermarks,
                 processingContext,
+                drainMode,
                 getWorkStreamLatencies) ->
                 computationStateCache
                     .get(processingContext.computationId())
@@ -401,6 +402,7 @@ public final class StreamingDataflowWorker {
                               serializedWorkItemSize,
                               watermarks,
                               processingContext,
+                              drainMode,
                               getWorkStreamLatencies);
                         }),
             ChannelCachingRemoteStubFactory.create(options.getGcpCredential(), 
channelCache),
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index f5157bb4695..e3424e3d667 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -196,6 +196,10 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
     return work != null && work.isFailed();
   }
 
+  public boolean getDrainMode() {
+    return work != null ? work.getDrainMode() : false;
+  }
+
   public boolean offsetBasedDeduplicationSupported() {
     return activeReader != null
         && activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
@@ -820,7 +824,10 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
                 .transform(
                     timer ->
                         WindmillTimerInternals.windmillTimerToTimerData(
-                            WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, 
timer, windowCoder))
+                            WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                            timer,
+                            windowCoder,
+                            getDrainMode()))
                 .iterator();
       }
 
@@ -880,7 +887,10 @@ public class StreamingModeExecutionContext extends 
DataflowExecutionContext<Step
                     .transform(
                         timer ->
                             WindmillTimerInternals.windmillTimerToTimerData(
-                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX, 
timer, windowCoder))
+                                WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+                                timer,
+                                windowCoder,
+                                getDrainMode()))
                     .iterator());
       }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
index a9a033c89ad..c248259a12d 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.Collection;
 import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.dataflow.util.CloudObject;
 import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
 import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -117,8 +118,16 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
       Collection<? extends BoundedWindow> windows =
           WindmillSink.decodeMetadataWindows(windowsCoder, 
message.getMetadata());
       PaneInfo paneInfo = 
WindmillSink.decodeMetadataPane(message.getMetadata());
+      /**
+       * https://s.apache.org/beam-drain-mode - propagate drain bit if 
aggregation/expiry induced by
+       * drain happened upstream
+       */
+      boolean drainingValueFromUpstream = false;
       if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
-        WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
+        BeamFnApi.Elements.ElementMetadata elementMetadata =
+            WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
+        drainingValueFromUpstream =
+            elementMetadata.getDrain() == 
BeamFnApi.Elements.DrainMode.Enum.DRAINING;
       }
       if (valueCoder instanceof KvCoder) {
         KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -128,12 +137,18 @@ class UngroupedWindmillReader<T> extends 
NativeReader<WindowedValue<T>> {
         @SuppressWarnings("unchecked")
         T result =
             (T) KV.of(decode(kvCoder.getKeyCoder(), key), 
decode(kvCoder.getValueCoder(), data));
-        // todo #33176 propagate metadata to windowed value
-        return WindowedValues.of(result, timestampMillis, windows, paneInfo);
+        return WindowedValues.of(
+            result, timestampMillis, windows, paneInfo, null, null, 
drainingValueFromUpstream);
       } else {
         notifyElementRead(data.available() + metadata.available());
-        // todo #33176 propagate metadata to windowed value
-        return WindowedValues.of(decode(valueCoder, data), timestampMillis, 
windows, paneInfo);
+        return WindowedValues.of(
+            decode(valueCoder, data),
+            timestampMillis,
+            windows,
+            paneInfo,
+            null,
+            null,
+            drainingValueFromUpstream);
       }
     }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index 6690377d3de..415dab526bb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.KeyedWorkItemCoder;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -60,6 +61,8 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
 
   private final Windmill.WorkItem workItem;
   private final K key;
+  // used to inform that timer was caused by drain
+  private final boolean drainMode;
 
   private final transient Coder<? extends BoundedWindow> windowCoder;
   private final transient Coder<Collection<? extends BoundedWindow>> 
windowsCoder;
@@ -70,12 +73,14 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
       Windmill.WorkItem workItem,
       Coder<? extends BoundedWindow> windowCoder,
       Coder<Collection<? extends BoundedWindow>> windowsCoder,
-      Coder<ElemT> valueCoder) {
+      Coder<ElemT> valueCoder,
+      boolean drainMode) {
     this.key = key;
     this.workItem = workItem;
     this.windowCoder = windowCoder;
     this.windowsCoder = windowsCoder;
     this.valueCoder = valueCoder;
+    this.drainMode = drainMode;
   }
 
   @Override
@@ -93,7 +98,10 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
         .transform(
             timer ->
                 WindmillTimerInternals.windmillTimerToTimerData(
-                    WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer, 
windowCoder));
+                    WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+                    timer,
+                    windowCoder,
+                    drainMode));
   }
 
   @Override
@@ -108,13 +116,21 @@ public class WindmillKeyedWorkItem<K, ElemT> implements 
KeyedWorkItem<K, ElemT>
                 Collection<? extends BoundedWindow> windows =
                     WindmillSink.decodeMetadataWindows(windowsCoder, 
message.getMetadata());
                 PaneInfo paneInfo = 
WindmillSink.decodeMetadataPane(message.getMetadata());
+                /**
+                 * https://s.apache.org/beam-drain-mode - propagate drain bit 
if aggregation/expiry
+                 * induced by drain happened upstream
+                 */
+                boolean drainingValueFromUpstream = false;
                 if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
-                  WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
+                  BeamFnApi.Elements.ElementMetadata elementMetadata =
+                      WindmillSink.decodeAdditionalMetadata(windowsCoder, 
message.getMetadata());
+                  drainingValueFromUpstream =
+                      elementMetadata.getDrain() == 
BeamFnApi.Elements.DrainMode.Enum.DRAINING;
                 }
                 InputStream inputStream = message.getData().newInput();
                 ElemT value = valueCoder.decode(inputStream, 
Coder.Context.OUTER);
-                // todo #33176 specify additional metadata in the future
-                return WindowedValues.of(value, timestamp, windows, paneInfo);
+                return WindowedValues.of(
+                    value, timestamp, windows, paneInfo, null, null, 
drainingValueFromUpstream);
               } catch (IOException e) {
                 throw new RuntimeException(e);
               }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index ee73ac138f0..8dac9d11715 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -301,7 +301,10 @@ class WindmillTimerInternals implements TimerInternals {
   }
 
   public static TimerData windmillTimerToTimerData(
-      WindmillNamespacePrefix prefix, Timer timer, Coder<? extends 
BoundedWindow> windowCoder) {
+      WindmillNamespacePrefix prefix,
+      Timer timer,
+      Coder<? extends BoundedWindow> windowCoder,
+      boolean draining) {
 
     // The tag is a path-structure string but cheaper to parse than a proper 
URI. It follows
     // this pattern, where no component but the ID can contain a slash
@@ -395,6 +398,8 @@ class WindmillTimerInternals implements TimerInternals {
         timestamp,
         outputTimestamp,
         timerTypeToTimeDomain(timer.getType()));
+    // todo add draining (https://github.com/apache/beam/issues/36884)
+
   }
 
   private static boolean useNewTimerTagEncoding(TimerData timerData) {
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
index d91a5412b91..f4a6eec61cb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
@@ -119,7 +119,8 @@ class WindowingWindmillReader<K, T> extends 
NativeReader<WindowedValue<KeyedWork
     final K key = keyCoder.decode(context.getSerializedKey().newInput(), 
Coder.Context.OUTER);
     final WorkItem workItem = context.getWorkItem();
     KeyedWorkItem<K, T> keyedWorkItem =
-        new WindmillKeyedWorkItem<>(key, workItem, windowCoder, windowsCoder, 
valueCoder);
+        new WindmillKeyedWorkItem<>(
+            key, workItem, windowCoder, windowsCoder, valueCoder, 
context.getDrainMode());
     final boolean isEmptyWorkItem =
         (Iterables.isEmpty(keyedWorkItem.timersIterable())
             && Iterables.isEmpty(keyedWorkItem.elementsIterable()));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 8b41a2d1321..43f355dd7ef 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -78,12 +78,14 @@ public final class Work implements RefreshableWork {
   private volatile TimedState currentState;
   private volatile boolean isFailed;
   private volatile String processingThreadName = "";
+  private final boolean drainMode;
 
   private Work(
       WorkItem workItem,
       long serializedWorkItemSize,
       Watermarks watermarks,
       ProcessingContext processingContext,
+      boolean drainMode,
       Supplier<Instant> clock) {
     this.shardedKey = ShardedKey.create(workItem.getKey(), 
workItem.getShardingKey());
     this.workItem = workItem;
@@ -91,6 +93,7 @@ public final class Work implements RefreshableWork {
     this.processingContext = processingContext;
     this.watermarks = watermarks;
     this.clock = clock;
+    this.drainMode = drainMode;
     this.startTime = clock.get();
     Preconditions.checkState(EMPTY_ENUM_MAP.isEmpty());
     // Create by passing EMPTY_ENUM_MAP to avoid recreating
@@ -110,8 +113,10 @@ public final class Work implements RefreshableWork {
       long serializedWorkItemSize,
       Watermarks watermarks,
       ProcessingContext processingContext,
+      boolean drainMode,
       Supplier<Instant> clock) {
-    return new Work(workItem, serializedWorkItemSize, watermarks, 
processingContext, clock);
+    return new Work(
+        workItem, serializedWorkItemSize, watermarks, processingContext, 
drainMode, clock);
   }
 
   public static ProcessingContext createProcessingContext(
@@ -207,6 +212,10 @@ public final class Work implements RefreshableWork {
     return currentState.state();
   }
 
+  public boolean getDrainMode() {
+    return drainMode;
+  }
+
   public void setState(State state) {
     Instant now = clock.get();
     totalDurationPerState.compute(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
index 0de9d130b65..af7746d6902 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
@@ -155,6 +155,7 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
               (computationId,
                   inputDataWatermark,
                   synchronizedProcessingTime,
+                  drainMode,
                   workItem,
                   serializedWorkItemSize,
                   getWorkStreamLatencies) ->
@@ -178,6 +179,7 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
                                     getDataClient,
                                     workCommitter::commit,
                                     heartbeatSender),
+                                drainMode,
                                 getWorkStreamLatencies);
                           }));
       try {
@@ -239,6 +241,7 @@ public final class SingleSourceWorkerHarness implements 
StreamingWorkerHarness {
               
watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(),
               Work.createProcessingContext(
                   computationId, getDataClient, workCommitter::commit, 
heartbeatSender),
+              computationWork.getDrainMode(),
               /* getWorkStreamLatencies= */ ImmutableList.of());
         }
       }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
index 0ebb4726d3a..3608bd1ccac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
@@ -124,7 +124,8 @@ final class GetWorkResponseChunkAssembler {
           metadataProto.getComputationId(),
           
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
           WindmillTimeUtils.windmillToHarnessWatermark(
-              metadataProto.getDependentRealtimeInputWatermark()));
+              metadataProto.getDependentRealtimeInputWatermark()),
+          metadataProto.getDrainMode());
     }
 
     abstract String computationId();
@@ -132,6 +133,8 @@ final class GetWorkResponseChunkAssembler {
     abstract @Nullable Instant inputDataWatermark();
 
     abstract @Nullable Instant synchronizedProcessingTime();
+
+    abstract boolean drainMode();
   }
 
   @AutoValue
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
index 2712bf1bd33..8eb4c51a2b4 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
@@ -281,6 +281,7 @@ final class GrpcDirectGetWorkStream
         assembledWorkItem.bufferedSize(),
         createWatermarks(workItem, metadata),
         createProcessingContext(metadata.computationId()),
+        metadata.drainMode(),
         assembledWorkItem.latencyAttributions());
     budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
     GetWorkBudget extension = budgetTracker.computeBudgetExtension();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
index ae7ce85e13a..58407ad8147 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
@@ -203,6 +203,7 @@ final class GrpcGetWorkStream
         assembledWorkItem.computationMetadata().computationId(),
         assembledWorkItem.computationMetadata().inputDataWatermark(),
         assembledWorkItem.computationMetadata().synchronizedProcessingTime(),
+        assembledWorkItem.computationMetadata().drainMode(),
         assembledWorkItem.workItem(),
         assembledWorkItem.bufferedSize(),
         assembledWorkItem.latencyAttributions());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
index e2f69585e48..71e524a308a 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
@@ -30,6 +30,7 @@ public interface WorkItemReceiver {
       String computation,
       @Nullable Instant inputDataWatermark,
       @Nullable Instant synchronizedProcessingTime,
+      boolean drainMode,
       Windmill.WorkItem workItem,
       long serializedWorkItemSize,
       ImmutableList<LatencyAttribution> getWorkStreamLatencies);
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
index b9d31fbe501..4121aa758ba 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
@@ -35,6 +35,7 @@ public interface WorkItemScheduler {
    * @param workItem {@link WorkItem} to be processed.
    * @param watermarks processing watermarks for the workItem.
    * @param processingContext for processing the workItem.
+   * @param drainMode is job is draining.
    * @param getWorkStreamLatencies Latencies per processing stage for the 
WorkItem for reporting
    *     back to Streaming Engine backend.
    */
@@ -43,5 +44,6 @@ public interface WorkItemScheduler {
       long serializedWorkItemSize,
       Watermarks watermarks,
       Work.ProcessingContext processingContext,
+      boolean drainMode,
       ImmutableList<LatencyAttribution> getWorkStreamLatencies);
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index e61c2d1f4a0..242e4a5f0db 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -210,10 +210,12 @@ public class StreamingWorkScheduler {
       long serializedWorkItemSize,
       Watermarks watermarks,
       Work.ProcessingContext processingContext,
+      boolean drainMode,
       ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
     computationState.activateWork(
         ExecutableWork.create(
-            Work.create(workItem, serializedWorkItemSize, watermarks, 
processingContext, clock),
+            Work.create(
+                workItem, serializedWorkItemSize, watermarks, 
processingContext, drainMode, clock),
             work -> processWork(computationState, work, 
getWorkStreamLatencies)));
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index a5c8909b8d0..1c5f7504bf3 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -275,6 +275,7 @@ public final class FakeWindmillServer extends 
WindmillServerStub {
                   computationWork.getComputationId(),
                   inputDataWatermark,
                   Instant.now(),
+                  computationWork.getDrainMode(),
                   workItem,
                   workItem.getSerializedSize(),
                   ImmutableList.of(
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index df90bb96139..d11c6c37433 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -372,6 +372,7 @@ public class StreamingDataflowWorkerTest {
             Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
             Work.createProcessingContext(
                 computationId, new FakeGetDataClient(), ignored -> {}, 
mock(HeartbeatSender.class)),
+            false,
             Instant::now),
         processWorkFn);
   }
@@ -3624,6 +3625,7 @@ public class StreamingDataflowWorkerTest {
                 new FakeGetDataClient(),
                 ignored -> {},
                 mock(HeartbeatSender.class)),
+            false,
             clock);
 
     clock.sleep(Duration.millis(10));
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index 1182a2c0b9e..f7852ec1767 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -194,7 +194,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
     return new ValueInEmptyWindows<>(
         (KeyedWorkItem<String, T>)
             new WindmillKeyedWorkItem<>(
-                KEY, workItem.build(), windowCoder, wildcardWindowsCoder, 
valueCoder));
+                KEY, workItem.build(), windowCoder, wildcardWindowsCoder, 
valueCoder, false));
   }
 
   @Test
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
index a348c0f0021..52c9844add8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
@@ -132,7 +132,7 @@ public class StreamingGroupAlsoByWindowsReshuffleDoFnTest {
     return new ValueInEmptyWindows<>(
         (KeyedWorkItem<String, T>)
             new WindmillKeyedWorkItem<>(
-                KEY, workItem.build(), windowCoder, wildcardWindowsCoder, 
valueCoder));
+                KEY, workItem.build(), windowCoder, wildcardWindowsCoder, 
valueCoder, false));
   }
 
   @Test
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index e216f912d77..93b279f0aec 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -143,6 +143,7 @@ public class StreamingModeExecutionContextTest {
         watermarks,
         Work.createProcessingContext(
             COMPUTATION_ID, new FakeGetDataClient(), ignored -> {}, 
mock(HeartbeatSender.class)),
+        false,
         Instant::now);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index 53a36722e41..bbdde449860 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Iterator;
 import org.apache.beam.model.fnexecution.v1.BeamFnApi;
 import org.apache.beam.runners.core.KeyedWorkItem;
 import org.apache.beam.runners.core.StateNamespace;
@@ -40,10 +41,12 @@ import 
org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.values.WindowedValue;
 import org.apache.beam.sdk.values.WindowedValues;
 import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
 import org.hamcrest.Matchers;
 import org.joda.time.Instant;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -93,7 +96,7 @@ public class WindmillKeyedWorkItemTest {
 
     KeyedWorkItem<String, String> keyedWorkItem =
         new WindmillKeyedWorkItem<>(
-            KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER);
+            KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, 
false);
 
     assertThat(
         keyedWorkItem.elementsIterable(),
@@ -123,6 +126,24 @@ public class WindmillKeyedWorkItemTest {
         .setMetadata(encodedMetadata);
   }
 
+  private void addElementWithMetadata(
+      Windmill.InputMessageBundle.Builder chunk,
+      long timestamp,
+      String value,
+      IntervalWindow window,
+      PaneInfo pane,
+      BeamFnApi.Elements.ElementMetadata metadata)
+      throws IOException {
+    ByteString encodedMetadata =
+        WindmillSink.encodeMetadata(
+            WINDOWS_CODER, Collections.singletonList(window), pane, metadata);
+    chunk
+        .addMessagesBuilder()
+        .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new 
Instant(timestamp)))
+        .setData(ByteString.copyFromUtf8(value))
+        .setMetadata(encodedMetadata);
+  }
+
   private PaneInfo paneInfo(int index) {
     return PaneInfo.createPane(false, false, Timing.EARLY, index, -1);
   }
@@ -148,7 +169,7 @@ public class WindmillKeyedWorkItemTest {
             .build();
 
     KeyedWorkItem<String, String> keyedWorkItem =
-        new WindmillKeyedWorkItem<>(KEY, workItem, WINDOW_CODER, 
WINDOWS_CODER, VALUE_CODER);
+        new WindmillKeyedWorkItem<>(KEY, workItem, WINDOW_CODER, 
WINDOWS_CODER, VALUE_CODER, false);
 
     assertThat(
         keyedWorkItem.timersIterable(),
@@ -186,4 +207,51 @@ public class WindmillKeyedWorkItemTest {
         FakeKeyedWorkItemCoder.of(
             KvCoder.of(GlobalWindow.Coder.INSTANCE, 
GlobalWindow.Coder.INSTANCE)));
   }
+
+  @Test
+  public void testDrainPropagated() throws Exception {
+    WindowedValues.FullWindowedValueCoder.setMetadataSupported();
+    Windmill.WorkItem.Builder workItem =
+        Windmill.WorkItem.newBuilder()
+            .setKey(SERIALIZED_KEY)
+            .setTimers(
+                Windmill.TimerBundle.newBuilder()
+                    .addTimers(
+                        makeSerializedTimer(STATE_NAMESPACE_2, 3, 
Windmill.Timer.Type.WATERMARK))
+                    .build())
+            .setWorkToken(17);
+    Windmill.InputMessageBundle.Builder chunk1 = 
workItem.addMessageBundlesBuilder();
+    chunk1.setSourceComputationId("computation");
+    addElementWithMetadata(
+        chunk1,
+        5,
+        "hello",
+        WINDOW_1,
+        paneInfo(0),
+        BeamFnApi.Elements.ElementMetadata.newBuilder()
+            .setDrain(BeamFnApi.Elements.DrainMode.Enum.DRAINING)
+            .build());
+    addElementWithMetadata(
+        chunk1,
+        7,
+        "world",
+        WINDOW_2,
+        paneInfo(2),
+        BeamFnApi.Elements.ElementMetadata.newBuilder()
+            .setDrain(BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
+            .build());
+    KeyedWorkItem<String, String> keyedWorkItem =
+        new WindmillKeyedWorkItem<>(
+            KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER, 
true);
+
+    Iterator<WindowedValue<String>> iterator = 
keyedWorkItem.elementsIterable().iterator();
+    Assert.assertTrue(iterator.next().causedByDrain());
+    Assert.assertFalse(iterator.next().causedByDrain());
+
+    // todo add assert for draining once timerdata is filled
+    // (https://github.com/apache/beam/issues/36884)
+    assertThat(
+        keyedWorkItem.timersIterable(),
+        Matchers.contains(makeTimer(STATE_NAMESPACE_2, 3, 
TimeDomain.EVENT_TIME)));
+  }
 }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index ec8672b6a75..4780cd768ef 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -96,7 +96,8 @@ public class WindmillTimerInternalsTest {
                     WindmillTimerInternals.windmillTimerToTimerData(
                         prefix,
                         
WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer),
-                        coder);
+                        coder,
+                        false);
                 // The function itself bounds output, so we dont expect the 
original input as the
                 // output, we expect it to be bounded
                 TimerData expected =
@@ -145,7 +146,8 @@ public class WindmillTimerInternalsTest {
                           prefix,
                           WindmillTimerInternals.timerDataToWindmillTimer(
                               stateFamily, prefix, timer),
-                          coder),
+                          coder,
+                          false),
                       equalTo(expected));
                 }
               }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index df3b959c82c..334b9414b26 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -204,6 +204,7 @@ public class WorkerCustomSourcesTest {
         watermarks,
         Work.createProcessingContext(
             COMPUTATION_ID, new FakeGetDataClient(), ignored -> {}, 
mock(HeartbeatSender.class)),
+        false,
         Instant::now);
   }
 
@@ -1014,6 +1015,7 @@ public class WorkerCustomSourcesTest {
                 new FakeGetDataClient(),
                 ignored -> {},
                 mock(HeartbeatSender.class)),
+            false,
             Instant::now);
     context.start(
         "key",
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
index c0cb8241d73..865ae261280 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
@@ -71,6 +71,7 @@ public class ActiveWorkStateTest {
             workItem.getSerializedSize(),
             Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
             createWorkProcessingContext(),
+            false,
             Instant::now),
         ignored -> {});
   }
@@ -82,6 +83,7 @@ public class ActiveWorkStateTest {
             workItem.getSerializedSize(),
             Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
             createWorkProcessingContext(),
+            false,
             () -> Instant.EPOCH),
         ignored -> {});
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
index 935b25acb6f..1c8b8fca131 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
@@ -75,6 +75,7 @@ public class ComputationStateCacheTest {
                 new FakeGetDataClient(),
                 ignored -> {},
                 mock(HeartbeatSender.class)),
+            false,
             Instant::now),
         ignored -> {});
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
index 65e40f171b0..94c8f4b7595 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
@@ -128,6 +128,7 @@ public class FanOutStreamingEngineWorkerHarnessTest {
         serializedWorkItemSize,
         watermarks,
         processingContext,
+        drainMode,
         getWorkStreamLatencies) -> {};
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
index b94270ad7bb..3217c736adb 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
@@ -68,6 +68,7 @@ public class WindmillStreamSenderTest {
           serializedWorkItemSize,
           watermarks,
           processingContext,
+          drainMode,
           getWorkStreamLatencies) -> {};
   @Rule public transient Timeout globalTimeout = Timeout.seconds(600);
   private ManagedChannel inProcessChannel;
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index a86e6060955..d7ea039bb80 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -83,6 +83,7 @@ public class BoundedQueueExecutorTest {
                 new FakeGetDataClient(),
                 ignored -> {},
                 mock(HeartbeatSender.class)),
+            false,
             Instant::now),
         executeWorkFn);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
index 477c764a70e..5c3132ae471 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
@@ -74,6 +74,7 @@ public class StreamingApplianceWorkCommitterTest {
               throw new UnsupportedOperationException();
             },
             mock(HeartbeatSender.class)),
+        false,
         Instant::now);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
index 5748b128f97..b4f63fa7161 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
@@ -104,6 +104,7 @@ public class StreamingEngineWorkCommitterTest {
               throw new UnsupportedOperationException();
             },
             mock(HeartbeatSender.class)),
+        false,
         Instant::now);
   }
 
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
index 41900017838..76883bebdac 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
@@ -70,6 +70,7 @@ public class GrpcDirectGetWorkStreamTest {
           serializedWorkItemSize,
           watermarks,
           processingContext,
+          drainMode,
           getWorkStreamLatencies) -> {};
   private static final Windmill.JobHeader TEST_JOB_HEADER =
       Windmill.JobHeader.newBuilder()
@@ -283,6 +284,7 @@ public class GrpcDirectGetWorkStreamTest {
                 serializedWorkItemSize,
                 watermarks,
                 processingContext,
+                drainMode,
                 getWorkStreamLatencies) -> {
               scheduledWorkItems.add(work);
             });
@@ -327,8 +329,12 @@ public class GrpcDirectGetWorkStreamTest {
         createGetWorkStream(
             testStub,
             initialBudget,
-            (work, serializedWorkItemSize, watermarks, processingContext, 
getWorkStreamLatencies) ->
-                scheduledWorkItems.add(work));
+            (work,
+                serializedWorkItemSize,
+                watermarks,
+                processingContext,
+                drainMode,
+                getWorkStreamLatencies) -> scheduledWorkItems.add(work));
     Windmill.WorkItem workItem =
         Windmill.WorkItem.newBuilder()
             .setKey(ByteString.copyFromUtf8("somewhat_long_key"))
@@ -365,6 +371,7 @@ public class GrpcDirectGetWorkStreamTest {
                 serializedWorkItemSize,
                 watermarks,
                 processingContext,
+                drainMode,
                 getWorkStreamLatencies) -> {
               scheduledWorkItems.add(work);
             });
@@ -408,6 +415,7 @@ public class GrpcDirectGetWorkStreamTest {
                 serializedWorkItemSize,
                 watermarks,
                 processingContext,
+                drainMode,
                 getWorkStreamLatencies) -> {
               scheduledWorkItems.add(work);
             });
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index e52b6e8de4b..d417d7d3417 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -336,6 +336,7 @@ public class GrpcWindmillServerTest {
             (String computation,
                 @Nullable Instant inputDataWatermark,
                 Instant synchronizedProcessingTime,
+                boolean drainMode,
                 WorkItem workItem,
                 long serializedWorkItemSize,
                 ImmutableList<LatencyAttribution> getWorkStreamLatencies) -> {
@@ -412,7 +413,8 @@ public class GrpcWindmillServerTest {
                                 ComputationWorkItemMetadata.newBuilder()
                                     .setComputationId("comp")
                                     .setDependentRealtimeInputWatermark(17000)
-                                    .setInputDataWatermark(18000));
+                                    .setInputDataWatermark(18000)
+                                    .setDrainMode(true));
                     int loopVariant = loop % 3;
                     if (loopVariant < 1) {
                       
responseChunk.addSerializedWorkItem(serializedResponses.pop());
@@ -469,12 +471,14 @@ public class GrpcWindmillServerTest {
             (String computation,
                 @Nullable Instant inputDataWatermark,
                 Instant synchronizedProcessingTime,
+                boolean drainMode,
                 WorkItem workItem,
                 long serializedWorkItemSize,
                 ImmutableList<LatencyAttribution> getWorkStreamLatencies) -> {
               assertEquals(inputDataWatermark, new Instant(18));
               assertEquals(synchronizedProcessingTime, new Instant(17));
               assertEquals(workItem.getKey(), 
ByteString.copyFromUtf8("somewhat_long_key"));
+              assertTrue(drainMode);
               assertTrue(sentResponseIds.containsKey(workItem.getWorkToken()));
               sentResponseIds.remove(workItem.getWorkToken());
               latch.countDown();
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
index f55549f7e2d..41f2230f4a8 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
@@ -95,6 +95,7 @@ public class WorkFailureProcessorTest {
                 new FakeGetDataClient(),
                 ignored -> {},
                 mock(HeartbeatSender.class)),
+            false,
             clock),
         processWorkFn);
   }
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
index 115deccf6df..e8820971002 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
@@ -133,6 +133,7 @@ public class ActiveWorkRefresherTest {
             Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
             Work.createProcessingContext(
                 "computationId", new FakeGetDataClient(), ignored -> {}, 
heartbeatSender),
+            false,
             A_LONG_TIME_AGO),
         processWork);
   }

Reply via email to