This is an automated email from the ASF dual-hosted git repository.
kenn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 81bb5066bab [Java] Dataflow runner v1 - Propagate drain mode (#36534)
81bb5066bab is described below
commit 81bb5066bab883f79328e52f0d2a55e9b90f2f65
Author: Radosław Stankiewicz <[email protected]>
AuthorDate: Tue Dec 2 17:54:19 2025 +0100
[Java] Dataflow runner v1 - Propagate drain mode (#36534)
---
.../dataflow/worker/StreamingDataflowWorker.java | 2 +
.../worker/StreamingModeExecutionContext.java | 14 ++++-
.../dataflow/worker/UngroupedWindmillReader.java | 25 ++++++--
.../dataflow/worker/WindmillKeyedWorkItem.java | 26 ++++++--
.../dataflow/worker/WindmillTimerInternals.java | 7 ++-
.../dataflow/worker/WindowingWindmillReader.java | 3 +-
.../runners/dataflow/worker/streaming/Work.java | 11 +++-
.../harness/SingleSourceWorkerHarness.java | 3 +
.../client/grpc/GetWorkResponseChunkAssembler.java | 5 +-
.../client/grpc/GrpcDirectGetWorkStream.java | 1 +
.../windmill/client/grpc/GrpcGetWorkStream.java | 1 +
.../worker/windmill/work/WorkItemReceiver.java | 1 +
.../worker/windmill/work/WorkItemScheduler.java | 2 +
.../work/processing/StreamingWorkScheduler.java | 4 +-
.../dataflow/worker/FakeWindmillServer.java | 1 +
.../worker/StreamingDataflowWorkerTest.java | 2 +
.../worker/StreamingGroupAlsoByWindowFnsTest.java | 2 +-
...reamingGroupAlsoByWindowsReshuffleDoFnTest.java | 2 +-
.../worker/StreamingModeExecutionContextTest.java | 1 +
.../dataflow/worker/WindmillKeyedWorkItemTest.java | 72 +++++++++++++++++++++-
.../worker/WindmillTimerInternalsTest.java | 6 +-
.../dataflow/worker/WorkerCustomSourcesTest.java | 2 +
.../worker/streaming/ActiveWorkStateTest.java | 2 +
.../streaming/ComputationStateCacheTest.java | 1 +
.../FanOutStreamingEngineWorkerHarnessTest.java | 1 +
.../harness/WindmillStreamSenderTest.java | 1 +
.../worker/util/BoundedQueueExecutorTest.java | 1 +
.../StreamingApplianceWorkCommitterTest.java | 1 +
.../commits/StreamingEngineWorkCommitterTest.java | 1 +
.../client/grpc/GrpcDirectGetWorkStreamTest.java | 12 +++-
.../client/grpc/GrpcWindmillServerTest.java | 6 +-
.../failures/WorkFailureProcessorTest.java | 1 +
.../work/refresh/ActiveWorkRefresherTest.java | 1 +
33 files changed, 195 insertions(+), 26 deletions(-)
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 83e924514b5..aad27b86986 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -389,6 +389,7 @@ public final class StreamingDataflowWorker {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) ->
computationStateCache
.get(processingContext.computationId())
@@ -401,6 +402,7 @@ public final class StreamingDataflowWorker {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies);
}),
ChannelCachingRemoteStubFactory.create(options.getGcpCredential(),
channelCache),
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index f5157bb4695..e3424e3d667 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -196,6 +196,10 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
return work != null && work.isFailed();
}
+ public boolean getDrainMode() {
+ return work != null ? work.getDrainMode() : false;
+ }
+
public boolean offsetBasedDeduplicationSupported() {
return activeReader != null
&& activeReader.getCurrentSource().offsetBasedDeduplicationSupported();
@@ -820,7 +824,10 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
timer, windowCoder))
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ timer,
+ windowCoder,
+ getDrainMode()))
.iterator();
}
@@ -880,7 +887,10 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
timer, windowCoder))
+ WindmillNamespacePrefix.USER_NAMESPACE_PREFIX,
+ timer,
+ windowCoder,
+ getDrainMode()))
.iterator());
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
index a9a033c89ad..c248259a12d 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/UngroupedWindmillReader.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.Map;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.dataflow.util.CloudObject;
import org.apache.beam.runners.dataflow.worker.util.common.worker.NativeReader;
import org.apache.beam.runners.dataflow.worker.windmill.Windmill;
@@ -117,8 +118,16 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
Collection<? extends BoundedWindow> windows =
WindmillSink.decodeMetadataWindows(windowsCoder,
message.getMetadata());
PaneInfo paneInfo =
WindmillSink.decodeMetadataPane(message.getMetadata());
+ /**
+ * https://s.apache.org/beam-drain-mode - propagate drain bit if
aggregation/expiry induced by
+ * drain happened upstream
+ */
+ boolean drainingValueFromUpstream = false;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
- WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
+ BeamFnApi.Elements.ElementMetadata elementMetadata =
+ WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
+ drainingValueFromUpstream =
+ elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING;
}
if (valueCoder instanceof KvCoder) {
KvCoder<?, ?> kvCoder = (KvCoder<?, ?>) valueCoder;
@@ -128,12 +137,18 @@ class UngroupedWindmillReader<T> extends
NativeReader<WindowedValue<T>> {
@SuppressWarnings("unchecked")
T result =
(T) KV.of(decode(kvCoder.getKeyCoder(), key),
decode(kvCoder.getValueCoder(), data));
- // todo #33176 propagate metadata to windowed value
- return WindowedValues.of(result, timestampMillis, windows, paneInfo);
+ return WindowedValues.of(
+ result, timestampMillis, windows, paneInfo, null, null,
drainingValueFromUpstream);
} else {
notifyElementRead(data.available() + metadata.available());
- // todo #33176 propagate metadata to windowed value
- return WindowedValues.of(decode(valueCoder, data), timestampMillis,
windows, paneInfo);
+ return WindowedValues.of(
+ decode(valueCoder, data),
+ timestampMillis,
+ windows,
+ paneInfo,
+ null,
+ null,
+ drainingValueFromUpstream);
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
index 6690377d3de..415dab526bb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItem.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Objects;
+import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.KeyedWorkItemCoder;
import org.apache.beam.runners.core.TimerInternals.TimerData;
@@ -60,6 +61,8 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
private final Windmill.WorkItem workItem;
private final K key;
+ // used to inform that timer was caused by drain
+ private final boolean drainMode;
private final transient Coder<? extends BoundedWindow> windowCoder;
private final transient Coder<Collection<? extends BoundedWindow>>
windowsCoder;
@@ -70,12 +73,14 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
Windmill.WorkItem workItem,
Coder<? extends BoundedWindow> windowCoder,
Coder<Collection<? extends BoundedWindow>> windowsCoder,
- Coder<ElemT> valueCoder) {
+ Coder<ElemT> valueCoder,
+ boolean drainMode) {
this.key = key;
this.workItem = workItem;
this.windowCoder = windowCoder;
this.windowsCoder = windowsCoder;
this.valueCoder = valueCoder;
+ this.drainMode = drainMode;
}
@Override
@@ -93,7 +98,10 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
.transform(
timer ->
WindmillTimerInternals.windmillTimerToTimerData(
- WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX, timer,
windowCoder));
+ WindmillNamespacePrefix.SYSTEM_NAMESPACE_PREFIX,
+ timer,
+ windowCoder,
+ drainMode));
}
@Override
@@ -108,13 +116,21 @@ public class WindmillKeyedWorkItem<K, ElemT> implements
KeyedWorkItem<K, ElemT>
Collection<? extends BoundedWindow> windows =
WindmillSink.decodeMetadataWindows(windowsCoder,
message.getMetadata());
PaneInfo paneInfo =
WindmillSink.decodeMetadataPane(message.getMetadata());
+ /**
+ * https://s.apache.org/beam-drain-mode - propagate drain bit
if aggregation/expiry
+ * induced by drain happened upstream
+ */
+ boolean drainingValueFromUpstream = false;
if (WindowedValues.WindowedValueCoder.isMetadataSupported()) {
- WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
+ BeamFnApi.Elements.ElementMetadata elementMetadata =
+ WindmillSink.decodeAdditionalMetadata(windowsCoder,
message.getMetadata());
+ drainingValueFromUpstream =
+ elementMetadata.getDrain() ==
BeamFnApi.Elements.DrainMode.Enum.DRAINING;
}
InputStream inputStream = message.getData().newInput();
ElemT value = valueCoder.decode(inputStream,
Coder.Context.OUTER);
- // todo #33176 specify additional metadata in the future
- return WindowedValues.of(value, timestamp, windows, paneInfo);
+ return WindowedValues.of(
+ value, timestamp, windows, paneInfo, null, null,
drainingValueFromUpstream);
} catch (IOException e) {
throw new RuntimeException(e);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
index ee73ac138f0..8dac9d11715 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternals.java
@@ -301,7 +301,10 @@ class WindmillTimerInternals implements TimerInternals {
}
public static TimerData windmillTimerToTimerData(
- WindmillNamespacePrefix prefix, Timer timer, Coder<? extends
BoundedWindow> windowCoder) {
+ WindmillNamespacePrefix prefix,
+ Timer timer,
+ Coder<? extends BoundedWindow> windowCoder,
+ boolean draining) {
// The tag is a path-structure string but cheaper to parse than a proper
URI. It follows
// this pattern, where no component but the ID can contain a slash
@@ -395,6 +398,8 @@ class WindmillTimerInternals implements TimerInternals {
timestamp,
outputTimestamp,
timerTypeToTimeDomain(timer.getType()));
+ // todo add draining (https://github.com/apache/beam/issues/36884)
+
}
private static boolean useNewTimerTagEncoding(TimerData timerData) {
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
index d91a5412b91..f4a6eec61cb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/WindowingWindmillReader.java
@@ -119,7 +119,8 @@ class WindowingWindmillReader<K, T> extends
NativeReader<WindowedValue<KeyedWork
final K key = keyCoder.decode(context.getSerializedKey().newInput(),
Coder.Context.OUTER);
final WorkItem workItem = context.getWorkItem();
KeyedWorkItem<K, T> keyedWorkItem =
- new WindmillKeyedWorkItem<>(key, workItem, windowCoder, windowsCoder,
valueCoder);
+ new WindmillKeyedWorkItem<>(
+ key, workItem, windowCoder, windowsCoder, valueCoder,
context.getDrainMode());
final boolean isEmptyWorkItem =
(Iterables.isEmpty(keyedWorkItem.timersIterable())
&& Iterables.isEmpty(keyedWorkItem.elementsIterable()));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
index 8b41a2d1321..43f355dd7ef 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/Work.java
@@ -78,12 +78,14 @@ public final class Work implements RefreshableWork {
private volatile TimedState currentState;
private volatile boolean isFailed;
private volatile String processingThreadName = "";
+ private final boolean drainMode;
private Work(
WorkItem workItem,
long serializedWorkItemSize,
Watermarks watermarks,
ProcessingContext processingContext,
+ boolean drainMode,
Supplier<Instant> clock) {
this.shardedKey = ShardedKey.create(workItem.getKey(),
workItem.getShardingKey());
this.workItem = workItem;
@@ -91,6 +93,7 @@ public final class Work implements RefreshableWork {
this.processingContext = processingContext;
this.watermarks = watermarks;
this.clock = clock;
+ this.drainMode = drainMode;
this.startTime = clock.get();
Preconditions.checkState(EMPTY_ENUM_MAP.isEmpty());
// Create by passing EMPTY_ENUM_MAP to avoid recreating
@@ -110,8 +113,10 @@ public final class Work implements RefreshableWork {
long serializedWorkItemSize,
Watermarks watermarks,
ProcessingContext processingContext,
+ boolean drainMode,
Supplier<Instant> clock) {
- return new Work(workItem, serializedWorkItemSize, watermarks,
processingContext, clock);
+ return new Work(
+ workItem, serializedWorkItemSize, watermarks, processingContext,
drainMode, clock);
}
public static ProcessingContext createProcessingContext(
@@ -207,6 +212,10 @@ public final class Work implements RefreshableWork {
return currentState.state();
}
+ public boolean getDrainMode() {
+ return drainMode;
+ }
+
public void setState(State state) {
Instant now = clock.get();
totalDurationPerState.compute(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
index 0de9d130b65..af7746d6902 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/harness/SingleSourceWorkerHarness.java
@@ -155,6 +155,7 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
(computationId,
inputDataWatermark,
synchronizedProcessingTime,
+ drainMode,
workItem,
serializedWorkItemSize,
getWorkStreamLatencies) ->
@@ -178,6 +179,7 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
getDataClient,
workCommitter::commit,
heartbeatSender),
+ drainMode,
getWorkStreamLatencies);
}));
try {
@@ -239,6 +241,7 @@ public final class SingleSourceWorkerHarness implements
StreamingWorkerHarness {
watermarks.setOutputDataWatermark(workItem.getOutputDataWatermark()).build(),
Work.createProcessingContext(
computationId, getDataClient, workCommitter::commit,
heartbeatSender),
+ computationWork.getDrainMode(),
/* getWorkStreamLatencies= */ ImmutableList.of());
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
index 0ebb4726d3a..3608bd1ccac 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
@@ -124,7 +124,8 @@ final class GetWorkResponseChunkAssembler {
metadataProto.getComputationId(),
WindmillTimeUtils.windmillToHarnessWatermark(metadataProto.getInputDataWatermark()),
WindmillTimeUtils.windmillToHarnessWatermark(
- metadataProto.getDependentRealtimeInputWatermark()));
+ metadataProto.getDependentRealtimeInputWatermark()),
+ metadataProto.getDrainMode());
}
abstract String computationId();
@@ -132,6 +133,8 @@ final class GetWorkResponseChunkAssembler {
abstract @Nullable Instant inputDataWatermark();
abstract @Nullable Instant synchronizedProcessingTime();
+
+ abstract boolean drainMode();
}
@AutoValue
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
index 2712bf1bd33..8eb4c51a2b4 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStream.java
@@ -281,6 +281,7 @@ final class GrpcDirectGetWorkStream
assembledWorkItem.bufferedSize(),
createWatermarks(workItem, metadata),
createProcessingContext(metadata.computationId()),
+ metadata.drainMode(),
assembledWorkItem.latencyAttributions());
budgetTracker.recordBudgetReceived(assembledWorkItem.bufferedSize());
GetWorkBudget extension = budgetTracker.computeBudgetExtension();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
index ae7ce85e13a..58407ad8147 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetWorkStream.java
@@ -203,6 +203,7 @@ final class GrpcGetWorkStream
assembledWorkItem.computationMetadata().computationId(),
assembledWorkItem.computationMetadata().inputDataWatermark(),
assembledWorkItem.computationMetadata().synchronizedProcessingTime(),
+ assembledWorkItem.computationMetadata().drainMode(),
assembledWorkItem.workItem(),
assembledWorkItem.bufferedSize(),
assembledWorkItem.latencyAttributions());
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
index e2f69585e48..71e524a308a 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemReceiver.java
@@ -30,6 +30,7 @@ public interface WorkItemReceiver {
String computation,
@Nullable Instant inputDataWatermark,
@Nullable Instant synchronizedProcessingTime,
+ boolean drainMode,
Windmill.WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
index b9d31fbe501..4121aa758ba 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/WorkItemScheduler.java
@@ -35,6 +35,7 @@ public interface WorkItemScheduler {
* @param workItem {@link WorkItem} to be processed.
* @param watermarks processing watermarks for the workItem.
* @param processingContext for processing the workItem.
+ * @param drainMode is job is draining.
* @param getWorkStreamLatencies Latencies per processing stage for the
WorkItem for reporting
* back to Streaming Engine backend.
*/
@@ -43,5 +44,6 @@ public interface WorkItemScheduler {
long serializedWorkItemSize,
Watermarks watermarks,
Work.ProcessingContext processingContext,
+ boolean drainMode,
ImmutableList<LatencyAttribution> getWorkStreamLatencies);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index e61c2d1f4a0..242e4a5f0db 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -210,10 +210,12 @@ public class StreamingWorkScheduler {
long serializedWorkItemSize,
Watermarks watermarks,
Work.ProcessingContext processingContext,
+ boolean drainMode,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) {
computationState.activateWork(
ExecutableWork.create(
- Work.create(workItem, serializedWorkItemSize, watermarks,
processingContext, clock),
+ Work.create(
+ workItem, serializedWorkItemSize, watermarks,
processingContext, drainMode, clock),
work -> processWork(computationState, work,
getWorkStreamLatencies)));
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
index a5c8909b8d0..1c5f7504bf3 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/FakeWindmillServer.java
@@ -275,6 +275,7 @@ public final class FakeWindmillServer extends
WindmillServerStub {
computationWork.getComputationId(),
inputDataWatermark,
Instant.now(),
+ computationWork.getDrainMode(),
workItem,
workItem.getSerializedSize(),
ImmutableList.of(
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
index df90bb96139..d11c6c37433 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorkerTest.java
@@ -372,6 +372,7 @@ public class StreamingDataflowWorkerTest {
Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
Work.createProcessingContext(
computationId, new FakeGetDataClient(), ignored -> {},
mock(HeartbeatSender.class)),
+ false,
Instant::now),
processWorkFn);
}
@@ -3624,6 +3625,7 @@ public class StreamingDataflowWorkerTest {
new FakeGetDataClient(),
ignored -> {},
mock(HeartbeatSender.class)),
+ false,
clock);
clock.sleep(Duration.millis(10));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
index 1182a2c0b9e..f7852ec1767 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowFnsTest.java
@@ -194,7 +194,7 @@ public class StreamingGroupAlsoByWindowFnsTest {
return new ValueInEmptyWindows<>(
(KeyedWorkItem<String, T>)
new WindmillKeyedWorkItem<>(
- KEY, workItem.build(), windowCoder, wildcardWindowsCoder,
valueCoder));
+ KEY, workItem.build(), windowCoder, wildcardWindowsCoder,
valueCoder, false));
}
@Test
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
index a348c0f0021..52c9844add8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingGroupAlsoByWindowsReshuffleDoFnTest.java
@@ -132,7 +132,7 @@ public class StreamingGroupAlsoByWindowsReshuffleDoFnTest {
return new ValueInEmptyWindows<>(
(KeyedWorkItem<String, T>)
new WindmillKeyedWorkItem<>(
- KEY, workItem.build(), windowCoder, wildcardWindowsCoder,
valueCoder));
+ KEY, workItem.build(), windowCoder, wildcardWindowsCoder,
valueCoder, false));
}
@Test
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
index e216f912d77..93b279f0aec 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContextTest.java
@@ -143,6 +143,7 @@ public class StreamingModeExecutionContextTest {
watermarks,
Work.createProcessingContext(
COMPUTATION_ID, new FakeGetDataClient(), ignored -> {},
mock(HeartbeatSender.class)),
+ false,
Instant::now);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
index 53a36722e41..bbdde449860 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillKeyedWorkItemTest.java
@@ -22,6 +22,7 @@ import static org.hamcrest.MatcherAssert.assertThat;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.KeyedWorkItem;
import org.apache.beam.runners.core.StateNamespace;
@@ -40,10 +41,12 @@ import
org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
+import org.apache.beam.sdk.values.WindowedValue;
import org.apache.beam.sdk.values.WindowedValues;
import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -93,7 +96,7 @@ public class WindmillKeyedWorkItemTest {
KeyedWorkItem<String, String> keyedWorkItem =
new WindmillKeyedWorkItem<>(
- KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER);
+ KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER,
false);
assertThat(
keyedWorkItem.elementsIterable(),
@@ -123,6 +126,24 @@ public class WindmillKeyedWorkItemTest {
.setMetadata(encodedMetadata);
}
+ private void addElementWithMetadata(
+ Windmill.InputMessageBundle.Builder chunk,
+ long timestamp,
+ String value,
+ IntervalWindow window,
+ PaneInfo pane,
+ BeamFnApi.Elements.ElementMetadata metadata)
+ throws IOException {
+ ByteString encodedMetadata =
+ WindmillSink.encodeMetadata(
+ WINDOWS_CODER, Collections.singletonList(window), pane, metadata);
+ chunk
+ .addMessagesBuilder()
+ .setTimestamp(WindmillTimeUtils.harnessToWindmillTimestamp(new
Instant(timestamp)))
+ .setData(ByteString.copyFromUtf8(value))
+ .setMetadata(encodedMetadata);
+ }
+
private PaneInfo paneInfo(int index) {
return PaneInfo.createPane(false, false, Timing.EARLY, index, -1);
}
@@ -148,7 +169,7 @@ public class WindmillKeyedWorkItemTest {
.build();
KeyedWorkItem<String, String> keyedWorkItem =
- new WindmillKeyedWorkItem<>(KEY, workItem, WINDOW_CODER,
WINDOWS_CODER, VALUE_CODER);
+ new WindmillKeyedWorkItem<>(KEY, workItem, WINDOW_CODER,
WINDOWS_CODER, VALUE_CODER, false);
assertThat(
keyedWorkItem.timersIterable(),
@@ -186,4 +207,51 @@ public class WindmillKeyedWorkItemTest {
FakeKeyedWorkItemCoder.of(
KvCoder.of(GlobalWindow.Coder.INSTANCE,
GlobalWindow.Coder.INSTANCE)));
}
+
+ @Test
+ public void testDrainPropagated() throws Exception {
+ WindowedValues.FullWindowedValueCoder.setMetadataSupported();
+ Windmill.WorkItem.Builder workItem =
+ Windmill.WorkItem.newBuilder()
+ .setKey(SERIALIZED_KEY)
+ .setTimers(
+ Windmill.TimerBundle.newBuilder()
+ .addTimers(
+ makeSerializedTimer(STATE_NAMESPACE_2, 3,
Windmill.Timer.Type.WATERMARK))
+ .build())
+ .setWorkToken(17);
+ Windmill.InputMessageBundle.Builder chunk1 =
workItem.addMessageBundlesBuilder();
+ chunk1.setSourceComputationId("computation");
+ addElementWithMetadata(
+ chunk1,
+ 5,
+ "hello",
+ WINDOW_1,
+ paneInfo(0),
+ BeamFnApi.Elements.ElementMetadata.newBuilder()
+ .setDrain(BeamFnApi.Elements.DrainMode.Enum.DRAINING)
+ .build());
+ addElementWithMetadata(
+ chunk1,
+ 7,
+ "world",
+ WINDOW_2,
+ paneInfo(2),
+ BeamFnApi.Elements.ElementMetadata.newBuilder()
+ .setDrain(BeamFnApi.Elements.DrainMode.Enum.NOT_DRAINING)
+ .build());
+ KeyedWorkItem<String, String> keyedWorkItem =
+ new WindmillKeyedWorkItem<>(
+ KEY, workItem.build(), WINDOW_CODER, WINDOWS_CODER, VALUE_CODER,
true);
+
+ Iterator<WindowedValue<String>> iterator =
keyedWorkItem.elementsIterable().iterator();
+ Assert.assertTrue(iterator.next().causedByDrain());
+ Assert.assertFalse(iterator.next().causedByDrain());
+
+ // todo add assert for draining once timerdata is filled
+ // (https://github.com/apache/beam/issues/36884)
+ assertThat(
+ keyedWorkItem.timersIterable(),
+ Matchers.contains(makeTimer(STATE_NAMESPACE_2, 3,
TimeDomain.EVENT_TIME)));
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
index ec8672b6a75..4780cd768ef 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WindmillTimerInternalsTest.java
@@ -96,7 +96,8 @@ public class WindmillTimerInternalsTest {
WindmillTimerInternals.windmillTimerToTimerData(
prefix,
WindmillTimerInternals.timerDataToWindmillTimer(stateFamily, prefix, timer),
- coder);
+ coder,
+ false);
// The function itself bounds output, so we dont expect the
original input as the
// output, we expect it to be bounded
TimerData expected =
@@ -145,7 +146,8 @@ public class WindmillTimerInternalsTest {
prefix,
WindmillTimerInternals.timerDataToWindmillTimer(
stateFamily, prefix, timer),
- coder),
+ coder,
+ false),
equalTo(expected));
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
index df3b959c82c..334b9414b26 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/WorkerCustomSourcesTest.java
@@ -204,6 +204,7 @@ public class WorkerCustomSourcesTest {
watermarks,
Work.createProcessingContext(
COMPUTATION_ID, new FakeGetDataClient(), ignored -> {},
mock(HeartbeatSender.class)),
+ false,
Instant::now);
}
@@ -1014,6 +1015,7 @@ public class WorkerCustomSourcesTest {
new FakeGetDataClient(),
ignored -> {},
mock(HeartbeatSender.class)),
+ false,
Instant::now);
context.start(
"key",
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
index c0cb8241d73..865ae261280 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkStateTest.java
@@ -71,6 +71,7 @@ public class ActiveWorkStateTest {
workItem.getSerializedSize(),
Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
createWorkProcessingContext(),
+ false,
Instant::now),
ignored -> {});
}
@@ -82,6 +83,7 @@ public class ActiveWorkStateTest {
workItem.getSerializedSize(),
Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
createWorkProcessingContext(),
+ false,
() -> Instant.EPOCH),
ignored -> {});
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
index 935b25acb6f..1c8b8fca131 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/ComputationStateCacheTest.java
@@ -75,6 +75,7 @@ public class ComputationStateCacheTest {
new FakeGetDataClient(),
ignored -> {},
mock(HeartbeatSender.class)),
+ false,
Instant::now),
ignored -> {});
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
index 65e40f171b0..94c8f4b7595 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/FanOutStreamingEngineWorkerHarnessTest.java
@@ -128,6 +128,7 @@ public class FanOutStreamingEngineWorkerHarnessTest {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) -> {};
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
index b94270ad7bb..3217c736adb 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/streaming/harness/WindmillStreamSenderTest.java
@@ -68,6 +68,7 @@ public class WindmillStreamSenderTest {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) -> {};
@Rule public transient Timeout globalTimeout = Timeout.seconds(600);
private ManagedChannel inProcessChannel;
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
index a86e6060955..d7ea039bb80 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/util/BoundedQueueExecutorTest.java
@@ -83,6 +83,7 @@ public class BoundedQueueExecutorTest {
new FakeGetDataClient(),
ignored -> {},
mock(HeartbeatSender.class)),
+ false,
Instant::now),
executeWorkFn);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
index 477c764a70e..5c3132ae471 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingApplianceWorkCommitterTest.java
@@ -74,6 +74,7 @@ public class StreamingApplianceWorkCommitterTest {
throw new UnsupportedOperationException();
},
mock(HeartbeatSender.class)),
+ false,
Instant::now);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
index 5748b128f97..b4f63fa7161 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/commits/StreamingEngineWorkCommitterTest.java
@@ -104,6 +104,7 @@ public class StreamingEngineWorkCommitterTest {
throw new UnsupportedOperationException();
},
mock(HeartbeatSender.class)),
+ false,
Instant::now);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
index 41900017838..76883bebdac 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcDirectGetWorkStreamTest.java
@@ -70,6 +70,7 @@ public class GrpcDirectGetWorkStreamTest {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) -> {};
private static final Windmill.JobHeader TEST_JOB_HEADER =
Windmill.JobHeader.newBuilder()
@@ -283,6 +284,7 @@ public class GrpcDirectGetWorkStreamTest {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) -> {
scheduledWorkItems.add(work);
});
@@ -327,8 +329,12 @@ public class GrpcDirectGetWorkStreamTest {
createGetWorkStream(
testStub,
initialBudget,
- (work, serializedWorkItemSize, watermarks, processingContext,
getWorkStreamLatencies) ->
- scheduledWorkItems.add(work));
+ (work,
+ serializedWorkItemSize,
+ watermarks,
+ processingContext,
+ drainMode,
+ getWorkStreamLatencies) -> scheduledWorkItems.add(work));
Windmill.WorkItem workItem =
Windmill.WorkItem.newBuilder()
.setKey(ByteString.copyFromUtf8("somewhat_long_key"))
@@ -365,6 +371,7 @@ public class GrpcDirectGetWorkStreamTest {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) -> {
scheduledWorkItems.add(work);
});
@@ -408,6 +415,7 @@ public class GrpcDirectGetWorkStreamTest {
serializedWorkItemSize,
watermarks,
processingContext,
+ drainMode,
getWorkStreamLatencies) -> {
scheduledWorkItems.add(work);
});
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
index e52b6e8de4b..d417d7d3417 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcWindmillServerTest.java
@@ -336,6 +336,7 @@ public class GrpcWindmillServerTest {
(String computation,
@Nullable Instant inputDataWatermark,
Instant synchronizedProcessingTime,
+ boolean drainMode,
WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) -> {
@@ -412,7 +413,8 @@ public class GrpcWindmillServerTest {
ComputationWorkItemMetadata.newBuilder()
.setComputationId("comp")
.setDependentRealtimeInputWatermark(17000)
- .setInputDataWatermark(18000));
+ .setInputDataWatermark(18000)
+ .setDrainMode(true));
int loopVariant = loop % 3;
if (loopVariant < 1) {
responseChunk.addSerializedWorkItem(serializedResponses.pop());
@@ -469,12 +471,14 @@ public class GrpcWindmillServerTest {
(String computation,
@Nullable Instant inputDataWatermark,
Instant synchronizedProcessingTime,
+ boolean drainMode,
WorkItem workItem,
long serializedWorkItemSize,
ImmutableList<LatencyAttribution> getWorkStreamLatencies) -> {
assertEquals(inputDataWatermark, new Instant(18));
assertEquals(synchronizedProcessingTime, new Instant(17));
assertEquals(workItem.getKey(),
ByteString.copyFromUtf8("somewhat_long_key"));
+ assertTrue(drainMode);
assertTrue(sentResponseIds.containsKey(workItem.getWorkToken()));
sentResponseIds.remove(workItem.getWorkToken());
latch.countDown();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
index f55549f7e2d..41f2230f4a8 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/failures/WorkFailureProcessorTest.java
@@ -95,6 +95,7 @@ public class WorkFailureProcessorTest {
new FakeGetDataClient(),
ignored -> {},
mock(HeartbeatSender.class)),
+ false,
clock),
processWorkFn);
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
index 115deccf6df..e8820971002 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/refresh/ActiveWorkRefresherTest.java
@@ -133,6 +133,7 @@ public class ActiveWorkRefresherTest {
Watermarks.builder().setInputDataWatermark(Instant.EPOCH).build(),
Work.createProcessingContext(
"computationId", new FakeGetDataClient(), ignored -> {},
heartbeatSender),
+ false,
A_LONG_TIME_AGO),
processWork);
}