This is an automated email from the ASF dual-hosted git repository.
scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 2209bd60122 Adds bundleFinalizer support to Dataflow non-portable
worker. (#37723)
2209bd60122 is described below
commit 2209bd6012287dadbf18d0e5fd521de1d7a2739d
Author: Andrew Crites <[email protected]>
AuthorDate: Fri Mar 20 01:49:42 2026 -0700
Adds bundleFinalizer support to Dataflow non-portable worker. (#37723)
* Adds bundleFinalizer support to non-portable worker.
* Removes check preventing stateful DoFn's with bundle finalizers from
running on Dataflow streaming non-portable worker when using Streaming Engine
---
...m_PostCommit_Java_ValidatesRunner_Dataflow.json | 4 +-
...it_Java_ValidatesRunner_Dataflow_Streaming.json | 2 +-
.../apache/beam/runners/core/SimpleDoFnRunner.java | 16 +-
runners/google-cloud-dataflow-java/build.gradle | 7 +-
.../beam/runners/dataflow/DataflowRunner.java | 6 +-
.../worker/SplittableProcessFnFactory.java | 5 +-
.../worker/StreamingModeExecutionContext.java | 74 +++++++--
.../client/grpc/GetWorkResponseChunkAssembler.java | 9 +-
.../work/processing/StreamingCommitFinalizer.java | 178 +++++++++++++++++----
.../work/processing/StreamingWorkScheduler.java | 1 +
.../runners/dataflow/worker/SimpleParDoFnTest.java | 111 ++++++++++++-
.../processing/StreamingCommitFinalizerTest.java | 177 ++++++++++++++++++++
.../worker/windmill/src/main/proto/windmill.proto | 17 +-
13 files changed, 544 insertions(+), 63 deletions(-)
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
index a89f7adb4ce..e9d869cc508 100644
--- a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
+++ b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow.json
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run!",
- "modification": 3,
+ "modification": 1,
}
-
+
diff --git
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
index e623d3373a9..0c41d2bcf2f 100644
---
a/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
+++
b/.github/trigger_files/beam_PostCommit_Java_ValidatesRunner_Dataflow_Streaming.json
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to
run!",
- "modification": 1,
+ "modification": 6,
}
diff --git
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index 74f5a4d0900..db24215e32c 100644
---
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -311,9 +311,14 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
public String getErrorContext() {
return "SimpleDoFnRunner/StartBundle";
}
+
+ @Override
+ public BundleFinalizer bundleFinalizer() {
+ return stepContext.bundleFinalizer();
+ }
}
- /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.StartBundle
@StartBundle}. */
+ /** An {@link DoFnInvoker.ArgumentProvider} for {@link DoFn.FinishBundle
@FinishBundle}. */
private class DoFnFinishBundleArgumentProvider
extends DoFnInvoker.BaseArgumentProvider<InputT, OutputT> {
/** A concrete implementation of {@link DoFn.FinishBundleContext}. */
@@ -356,6 +361,11 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
public String getErrorContext() {
return "SimpleDoFnRunner/FinishBundle";
}
+
+ @Override
+ public BundleFinalizer bundleFinalizer() {
+ return stepContext.bundleFinalizer();
+ }
}
/**
@@ -1030,7 +1040,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
- "Bundle finalization is not supported in non-portable pipelines.");
+ "Bundle finalization is not supported in OnTimer calls.");
}
}
@@ -1289,7 +1299,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements
DoFnRunner<InputT, Out
@Override
public BundleFinalizer bundleFinalizer() {
throw new UnsupportedOperationException(
- "Bundle finalization is not supported in non-portable pipelines.");
+ "Bundle finalization is not supported in OnWindowExpiration calls.");
}
}
diff --git a/runners/google-cloud-dataflow-java/build.gradle
b/runners/google-cloud-dataflow-java/build.gradle
index 49de59fac32..652c72c323e 100644
--- a/runners/google-cloud-dataflow-java/build.gradle
+++ b/runners/google-cloud-dataflow-java/build.gradle
@@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesMetricsPusher',
- 'org.apache.beam.sdk.testing.UsesBundleFinalizer',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of
now does not support returning back BoundedTrie in metric result.
]
@@ -456,7 +455,9 @@ task validatesRunner {
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInSetupStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful',
- ]
+ ],
+ // Batch legacy worker does not support bundle finalization.
+ excludedCategories: [ 'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
))
}
@@ -490,6 +491,8 @@ task validatesRunnerStreaming {
description "Validates Dataflow runner forcing streaming mode"
dependsOn(createLegacyWorkerValidatesRunnerTest(validatesRunnerStreamingConfig
+ [
name: 'validatesRunnerLegacyWorkerTestStreaming',
+ // Streaming appliance currently fails bundle finalizer tests.
+ excludedCategories: validatesRunnerStreamingConfig.excludedCategories + [
'org.apache.beam.sdk.testing.UsesBundleFinalizer', ],
]))
}
diff --git
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 9d963af0ecb..8bda9c20763 100644
---
a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++
b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2737,11 +2737,11 @@ public class DataflowRunner extends
PipelineRunner<DataflowPipelineJob> {
DataflowRunner.class.getSimpleName()));
}
boolean isUnifiedWorker = useUnifiedWorker(options);
- if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker) {
+ if (DoFnSignatures.usesBundleFinalizer(fn) && !isUnifiedWorker &&
!streaming) {
throw new UnsupportedOperationException(
String.format(
- "%s does not currently support %s when not using unified worker
because it uses "
- + "BundleFinalizers in its implementation. Set the
`--experiments=use_runner_v2` "
+ "%s does not currently support %s in batch mode when not using
unified worker because it "
+ + "uses BundleFinalizers in its implementation. Set the
`--experiments=use_runner_v2` "
+ "option to use this DoFn.",
DataflowRunner.class.getSimpleName(),
fn.getClass().getSimpleName()));
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
index 4473dff8e94..93c288fea9e 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/SplittableProcessFnFactory.java
@@ -156,10 +156,7 @@ class SplittableProcessFnFactory {
// in the event of a crash.
10000,
Duration.standardSeconds(10),
- () -> {
- throw new UnsupportedOperationException(
- "BundleFinalizer unsupported by non-portable Dataflow.");
- }));
+ stepContext::bundleFinalizer));
DoFnRunner<KeyedWorkItem<byte[], KV<InputT, RestrictionT>>, OutputT>
simpleRunner =
new SimpleDoFnRunner<>(
options,
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
index b4568877143..f75d452b211 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingModeExecutionContext.java
@@ -35,6 +35,7 @@ import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.concurrent.NotThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.StateInternals;
import org.apache.beam.runners.core.StateNamespace;
@@ -73,6 +74,7 @@ import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.metrics.MetricsContainer;
import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.values.PCollectionView;
@@ -82,8 +84,10 @@ import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.Vi
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.FluentIterable;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.HashBasedTable;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterators;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.PeekingIterator;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
@@ -446,11 +450,27 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
}
}
- public Map<Long, Runnable> flushState() {
- Map<Long, Runnable> callbacks = new HashMap<>();
+ public Map<Long, Pair<Instant, Runnable>> flushState() {
+ Map<Long, Pair<Instant, Runnable>> callbacks = new HashMap<>();
for (StepContext stepContext : getAllStepContexts()) {
stepContext.flushState();
+ for (Pair<Instant, BundleFinalizer.Callback> bundleFinalizer :
+ stepContext.flushBundleFinalizerCallbacks()) {
+ long id = ThreadLocalRandom.current().nextLong();
+ callbacks.put(
+ id,
+ Pair.of(
+ bundleFinalizer.getLeft(),
+ () -> {
+ try {
+ bundleFinalizer.getRight().onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException("Exception while running bundle
finalizer", e);
+ }
+ }));
+ outputBuilder.addFinalizeIds(id);
+ }
}
if (activeReader != null) {
@@ -462,13 +482,15 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
sourceStateBuilder.addFinalizeIds(id);
callbacks.put(
id,
- () -> {
- try {
- checkpointMark.finalizeCheckpoint();
- } catch (IOException e) {
- throw new RuntimeException("Exception while finalizing
checkpoint", e);
- }
- });
+ Pair.of(
+ Instant.now().plus(Duration.standardMinutes(5)),
+ () -> {
+ try {
+ checkpointMark.finalizeCheckpoint();
+ } catch (IOException e) {
+ throw new RuntimeException("Exception while finalizing
checkpoint", e);
+ }
+ }));
@SuppressWarnings("unchecked")
Coder<UnboundedSource.CheckpointMark> checkpointCoder =
@@ -699,6 +721,11 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
public DataflowStepContext namespacedToUser() {
return this;
}
+
+ @Override
+ public BundleFinalizer bundleFinalizer() {
+ return wrapped.bundleFinalizer();
+ }
}
/** A {@link SideInputReader} that fetches side inputs from the streaming
worker's cache. */
@@ -771,6 +798,7 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
// A list of timer keys that were modified by user processing earlier in
this bundle. This
// serves a tombstone, so that we know not to fire any bundle timers that
were modified.
private Table<String, StateNamespace, TimerData> modifiedUserTimerKeys =
null;
+ private final WindmillBundleFinalizer bundleFinalizer = new
WindmillBundleFinalizer();
public StepContext(DataflowOperationContext operationContext) {
super(operationContext.nameContext());
@@ -1043,9 +1071,37 @@ public class StreamingModeExecutionContext extends
DataflowExecutionContext<Step
return checkNotNull(systemTimerInternals);
}
+ @Override
+ public BundleFinalizer bundleFinalizer() {
+ return bundleFinalizer;
+ }
+
public TimerInternals userTimerInternals() {
ensureStateful("Tried to access user timers");
return checkNotNull(userTimerInternals);
}
+
+ public ImmutableList<Pair<Instant, BundleFinalizer.Callback>>
flushBundleFinalizerCallbacks() {
+ return bundleFinalizer.flushCallbacks();
+ }
+ }
+
+ private static class WindmillBundleFinalizer implements BundleFinalizer {
+ private ImmutableList.Builder<Pair<Instant, Callback>> callbacks =
ImmutableList.builder();
+
+ private WindmillBundleFinalizer() {}
+
+ private ImmutableList<Pair<Instant, Callback>> flushCallbacks() {
+ ImmutableList<Pair<Instant, Callback>> flushedCallbacks =
callbacks.build();
+ if (!Iterables.isEmpty(flushedCallbacks)) {
+ callbacks = ImmutableList.builder();
+ }
+ return flushedCallbacks;
+ }
+
+ @Override
+ public void afterBundleCommit(Instant callbackExpiry, Callback callback) {
+ callbacks.add(Pair.of(callbackExpiry, callback));
+ }
}
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
index 3608bd1ccac..4afee8a70df 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GetWorkResponseChunkAssembler.java
@@ -54,6 +54,7 @@ final class GetWorkResponseChunkAssembler {
private final WorkItem.Builder workItemBuilder; // Reused to reduce GC
overhead.
private ByteString data;
private long bufferedSize;
+ private final List<Long> appliedFinalizeIds;
GetWorkResponseChunkAssembler() {
workTimingInfosTracker = new
GetWorkTimingInfosTracker(System::currentTimeMillis);
@@ -61,10 +62,11 @@ final class GetWorkResponseChunkAssembler {
bufferedSize = 0;
metadata = null;
workItemBuilder = WorkItem.newBuilder();
+ appliedFinalizeIds = new ArrayList<>();
}
/**
- * Appends the response chunk bytes to the {@link #data }byte buffer. Return
the assembled
+ * Appends the response chunk bytes to the {@link #data} byte buffer. Return
the assembled
* WorkItem if all response chunks for a WorkItem have been received.
*/
List<AssembledWorkItem> append(Windmill.StreamingGetWorkResponseChunk chunk)
{
@@ -72,6 +74,7 @@ final class GetWorkResponseChunkAssembler {
metadata = ComputationMetadata.fromProto(chunk.getComputationMetadata());
}
workTimingInfosTracker.addTimingInfo(chunk.getPerWorkItemTimingInfosList());
+ appliedFinalizeIds.addAll(chunk.getAppliedFinalizeIdsList());
List<AssembledWorkItem> response = new ArrayList<>();
for (int i = 0; i < chunk.getSerializedWorkItemList().size(); i++) {
@@ -90,13 +93,14 @@ final class GetWorkResponseChunkAssembler {
}
/**
- * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ it's
metadata. Resets the
+ * Attempt to flush the {@link #data} bytes into a {@link WorkItem} w/ its
metadata. Resets the
* data byte string and tracking metadata afterwards, whether the {@link
WorkItem} deserialization
* was successful or not.
*/
private Optional<AssembledWorkItem> flushToWorkItem() {
try {
workItemBuilder.mergeFrom(data);
+ workItemBuilder.addAllAppliedFinalizeIds(appliedFinalizeIds);
return Optional.of(
AssembledWorkItem.create(
workItemBuilder.build(),
@@ -110,6 +114,7 @@ final class GetWorkResponseChunkAssembler {
workTimingInfosTracker.reset();
data = ByteString.EMPTY;
bufferedSize = 0;
+ appliedFinalizeIds.clear();
}
return Optional.empty();
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
index 7116aed3a2b..4266f11f50c 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizer.java
@@ -17,14 +17,28 @@
*/
package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
-import java.time.Duration;
+import static
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;
+
+import com.google.auto.value.AutoValue;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
-import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,28 +46,110 @@ import org.slf4j.LoggerFactory;
@Internal
final class StreamingCommitFinalizer {
private static final Logger LOG =
LoggerFactory.getLogger(StreamingCommitFinalizer.class);
- private static final Duration DEFAULT_CACHE_ENTRY_EXPIRY =
Duration.ofMinutes(5L);
- private final Cache<Long, Runnable> commitFinalizerCache;
+
+ /** A {@link Runnable} and expiry time pair. */
+ @AutoValue
+ public abstract static class FinalizationInfo {
+ public abstract long getId();
+
+ public abstract Instant getExpiryTime();
+
+ public abstract Runnable getCallback();
+
+ public static FinalizationInfo create(Long id, Instant expiryTime,
Runnable callback) {
+ return new AutoValue_StreamingCommitFinalizer_FinalizationInfo(id,
expiryTime, callback);
+ }
+ }
+
+ private final ReentrantLock lock = new ReentrantLock();
+ private final Condition queueMinChanged = lock.newCondition();
+
+ @GuardedBy("lock")
+ private final HashMap<Long, FinalizationInfo> commitFinalizationCallbacks =
new HashMap<>();
+
+ @GuardedBy("lock")
+ private final PriorityQueue<FinalizationInfo> cleanUpQueue =
+ new PriorityQueue<>(11,
Comparator.comparing(FinalizationInfo::getExpiryTime));
+
+ @GuardedBy("lock")
+ private boolean cleanUpThreadStarted = false;
+
private final BoundedQueueExecutor finalizationExecutor;
- private StreamingCommitFinalizer(
- Cache<Long, Runnable> commitFinalizerCache, BoundedQueueExecutor
finalizationExecutor) {
- this.commitFinalizerCache = commitFinalizerCache;
- this.finalizationExecutor = finalizationExecutor;
+ private StreamingCommitFinalizer(BoundedQueueExecutor
finalizationCleanupExecutor) {
+ finalizationExecutor = finalizationCleanupExecutor;
+ }
+
+ private void cleanupThreadBody() {
+ lock.lock();
+ try {
+ while (true) {
+ final @Nullable FinalizationInfo minValue = cleanUpQueue.peek();
+ if (minValue == null) {
+ // Wait for an element to be added and loop to re-examine the min.
+ queueMinChanged.await();
+ continue;
+ }
+
+ Instant now = Instant.now();
+ Duration timeDifference = new Duration(now, minValue.getExpiryTime());
+ if (timeDifference.getMillis() < 0
+ || (queueMinChanged.await(timeDifference.getMillis(),
TimeUnit.MILLISECONDS)
+ && cleanUpQueue.peek() == minValue)) {
+ // The minimum element has an expiry time before now, either because
it had elapsed when
+ // we pulled it or because we awaited it, and it is still the
minimum.
+ checkState(minValue == cleanUpQueue.poll());
+ checkState(commitFinalizationCallbacks.remove(minValue.getId()) ==
minValue);
+ }
+ }
+ } catch (InterruptedException e) {
+ // We're being shutdown.
+ } finally {
+ lock.unlock();
+ }
}
static StreamingCommitFinalizer create(BoundedQueueExecutor workExecutor) {
- return new StreamingCommitFinalizer(
-
CacheBuilder.newBuilder().expireAfterWrite(DEFAULT_CACHE_ENTRY_EXPIRY).build(),
- workExecutor);
+ return new StreamingCommitFinalizer(workExecutor);
}
/**
* Stores a map of user worker generated finalization ids and callbacks to
execute once a commit
* has been successfully committed to the backing state store.
*/
- void cacheCommitFinalizers(Map<Long, Runnable> commitCallbacks) {
- commitFinalizerCache.putAll(commitCallbacks);
+ public void cacheCommitFinalizers(Map<Long, Pair<Instant, Runnable>>
callbacks) {
+ for (Map.Entry<Long, Pair<Instant, Runnable>> entry :
callbacks.entrySet()) {
+ Long finalizeId = entry.getKey();
+ final FinalizationInfo info =
+ FinalizationInfo.create(
+ finalizeId, entry.getValue().getLeft(),
entry.getValue().getRight());
+
+ lock.lock();
+ try {
+ FinalizationInfo existingInfo =
commitFinalizationCallbacks.put(finalizeId, info);
+ if (existingInfo != null) {
+ throw new IllegalStateException(
+ "Expected to not have any past callbacks for bundle "
+ + finalizeId
+ + " but had "
+ + existingInfo);
+ }
+ if (!cleanUpThreadStarted) {
+ // Start the cleanup thread lazily for pipelines that don't use
finalization callbacks
+ // and some tests.
+ cleanUpThreadStarted = true;
+ finalizationExecutor.execute(this::cleanupThreadBody, 0);
+ }
+ cleanUpQueue.add(info);
+ @SuppressWarnings("ReferenceEquality")
+ boolean newMin = cleanUpQueue.peek() == info;
+ if (newMin) {
+ queueMinChanged.signal();
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
}
/**
@@ -61,27 +157,41 @@ final class StreamingCommitFinalizer {
* successfully persisted in the backing state store. If the commitCallback
for the finalizationId
* is still cached it is invoked.
*/
- void finalizeCommits(Iterable<Long> finalizeIds) {
- for (long finalizeId : finalizeIds) {
- @Nullable Runnable finalizeCommit =
commitFinalizerCache.getIfPresent(finalizeId);
- // NOTE: It is possible the same callback id may be removed twice if
- // windmill restarts.
- // TODO: It is also possible for an earlier finalized id to be lost.
- // We should automatically discard all older callbacks for the same
computation and key.
- if (finalizeCommit != null) {
- commitFinalizerCache.invalidate(finalizeId);
- finalizationExecutor.forceExecute(
- () -> {
- try {
- finalizeCommit.run();
- } catch (OutOfMemoryError oom) {
- throw oom;
- } catch (Throwable t) {
- LOG.error("Source checkpoint finalization failed:", t);
- }
- },
- 0);
+ public void finalizeCommits(Iterable<Long> finalizeIds) {
+ if (Iterables.isEmpty(finalizeIds)) {
+ return;
+ }
+ List<Runnable> callbacksToExecute = new ArrayList<>();
+ lock.lock();
+ try {
+ for (long finalizeId : finalizeIds) {
+ @Nullable FinalizationInfo info =
commitFinalizationCallbacks.remove(finalizeId);
+ if (info != null) {
+ checkState(cleanUpQueue.remove(info));
+ callbacksToExecute.add(info.getCallback());
+ }
+ }
+ } finally {
+ lock.unlock();
+ }
+ for (Runnable callback : callbacksToExecute) {
+ try {
+ finalizationExecutor.forceExecute(callback, 0);
+ } catch (OutOfMemoryError oom) {
+ throw oom;
+ } catch (Throwable t) {
+ LOG.error("Commit finalization failed:", t);
}
}
}
+
+ @VisibleForTesting
+ int cleanupQueueSize() {
+ lock.lock();
+ try {
+ return cleanUpQueue.size();
+ } finally {
+ lock.unlock();
+ }
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
index bb936831d6e..b0f3a289902 100644
---
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
+++
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingWorkScheduler.java
@@ -246,6 +246,7 @@ public class StreamingWorkScheduler {
// Before any processing starts, call any pending OnCommit callbacks.
Nothing that requires
// cleanup should be done before this, since we might exit early here.
commitFinalizer.finalizeCommits(workItem.getSourceState().getFinalizeIdsList());
+ commitFinalizer.finalizeCommits(workItem.getAppliedFinalizeIdsList());
if (workItem.getSourceState().getOnlyFinalize()) {
Windmill.WorkItemCommitRequest.Builder outputBuilder =
initializeOutputBuilder(key, workItem);
outputBuilder.setSourceStateUpdates(Windmill.SourceState.newBuilder().setOnlyFinalize(true));
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
index 9e45425562a..9c9f5386f44 100644
---
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/SimpleParDoFnTest.java
@@ -36,20 +36,24 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.runners.core.NullSideInputReader;
import org.apache.beam.runners.core.SideInputReader;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
+import
org.apache.beam.runners.dataflow.worker.DataflowExecutionContext.DataflowStepContext;
import
org.apache.beam.runners.dataflow.worker.counters.CounterFactory.CounterDistribution;
import org.apache.beam.runners.dataflow.worker.counters.CounterName;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import
org.apache.beam.runners.dataflow.worker.counters.DataflowCounterUpdateExtractor;
import org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoFn;
import org.apache.beam.runners.dataflow.worker.util.common.worker.Receiver;
+import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFn.BundleFinalizer;
import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
@@ -62,16 +66,20 @@ import org.apache.beam.sdk.values.WindowingStrategy;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+import org.mockito.Mockito;
/** Tests for {@link SimpleParDoFn}. */
@RunWith(JUnit4.class)
public class SimpleParDoFnTest {
+
@Rule public ExpectedException thrown = ExpectedException.none();
private PipelineOptions options;
@@ -95,6 +103,7 @@ public class SimpleParDoFnTest {
// TODO: Replace TestDoFn usages with a mock DoFn to reduce boilerplate.
static class TestDoFn extends DoFn<Integer, String> {
+
enum State {
UNSTARTED,
SET_UP,
@@ -156,6 +165,7 @@ public class SimpleParDoFnTest {
}
static class TestErrorDoFn extends DoFn<Integer, String> {
+
// Used to test nested stack traces.
private void nestedFunctionBeta(String s) {
throw new RuntimeException(s);
@@ -182,6 +192,7 @@ public class SimpleParDoFnTest {
}
static class TestReceiver implements Receiver {
+
List<Object> receivedElems = new ArrayList<>();
@Override
@@ -563,10 +574,10 @@ public class SimpleParDoFnTest {
* @param inputData Input elements to process. For each element X, the DoFn
will output a string
* repeated X times.
* @return Delta counter updates extracted after execution.
- * @throws Exception
*/
private List<CounterUpdate> executeParDoFnCounterTest(int... inputData)
throws Exception {
class RepeaterDoFn extends DoFn<Integer, String> {
+
/** Takes as input the number of times to output a message. */
@ProcessElement
public void processElement(ProcessContext c) {
@@ -616,6 +627,7 @@ public class SimpleParDoFnTest {
* conversion according to the {@link PCollectionView} and projection to a
particular window.
*/
private static class EmptySideInputReader implements SideInputReader {
+
private EmptySideInputReader() {}
@Override
@@ -633,4 +645,101 @@ public class SimpleParDoFnTest {
throw new IllegalArgumentException("calling getSideInput() with unknown
view");
}
}
+
+ @Test
+ public void testBundleFinalizer() throws Exception {
+ WithBundleFinalizerDoFn.startBundleCount.set(0);
+ WithBundleFinalizerDoFn.processElementCount.set(0);
+ WithBundleFinalizerDoFn.finishBundleCount.set(0);
+ DoFnInfo<Long, String> fnInfo =
+ DoFnInfo.forFn(
+ new WithBundleFinalizerDoFn(),
+ WindowingStrategy.globalDefault(),
+ null /* side input views */,
+ VarLongCoder.of(),
+ MAIN_OUTPUT,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap());
+ DataflowExecutionContext.DataflowStepContext userStepContext =
+ Mockito.mock(
+ DataflowExecutionContext.DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("bundleFinalizer")) {
+ return new BundleFinalizer() {
+ @Override
+ public void afterBundleCommit(Instant expiry, Callback
callback) {
+ try {
+ callback.onBundleSuccess();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ };
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ DataflowStepContext stepContextWithBundleFinalizer =
+ Mockito.mock(
+ DataflowStepContext.class,
+ invocation -> {
+ if (invocation.getMethod().getName().equals("namespacedToUser"))
{
+ return userStepContext;
+ }
+ return invocation.getMethod().invoke(stepContext,
invocation.getArguments());
+ });
+
+ ParDoFn parDoFn =
+ new SimpleParDoFn<>(
+ options,
+ DoFnInstanceManagers.singleInstance(fnInfo),
+ new EmptySideInputReader(),
+ MAIN_OUTPUT,
+ ImmutableMap.of(MAIN_OUTPUT, 0),
+ stepContextWithBundleFinalizer,
+ operationContext,
+ DoFnSchemaInformation.create(),
+ Collections.emptyMap(),
+ SimpleDoFnRunnerFactory.INSTANCE);
+
+ parDoFn.startBundle(new TestReceiver());
+
+ // Process a few elements
+ for (int i = 0; i < 5; i++) {
+ parDoFn.processElement(WindowedValues.valueInGlobalWindow(1L));
+ }
+
+ parDoFn.finishBundle();
+
+ assertThat(WithBundleFinalizerDoFn.startBundleCount.get(), equalTo(1));
+ assertThat(WithBundleFinalizerDoFn.processElementCount.get(), equalTo(5));
+ assertThat(WithBundleFinalizerDoFn.finishBundleCount.get(), equalTo(1));
+ }
+
+ static class WithBundleFinalizerDoFn extends DoFn<Long, String> {
+ private static final AtomicInteger startBundleCount = new AtomicInteger(0);
+ private static final AtomicInteger processElementCount = new
AtomicInteger(0);
+ private static final AtomicInteger finishBundleCount = new
AtomicInteger(0);
+
+ @StartBundle
+ public void startBundle(StartBundleContext context, BundleFinalizer
bundleFinalizer) {
+ bundleFinalizer.afterBundleCommit(
+ Instant.now().plus(Duration.standardMinutes(5)),
+ () -> startBundleCount.incrementAndGet());
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c, BundleFinalizer
bundleFinalizer) {
+ bundleFinalizer.afterBundleCommit(
+ Instant.now().plus(Duration.standardMinutes(5)),
+ () -> processElementCount.incrementAndGet());
+ }
+
+ @FinishBundle
+ public void finishBundle(FinishBundleContext context, BundleFinalizer
bundleFinalizer) {
+ bundleFinalizer.afterBundleCommit(
+ Instant.now().plus(Duration.standardMinutes(5)),
+ () -> finishBundleCount.incrementAndGet());
+ }
+ }
}
diff --git
a/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
new file mode 100644
index 00000000000..7361d0be2cd
--- /dev/null
+++
b/runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/work/processing/StreamingCommitFinalizerTest.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.dataflow.worker.windmill.work.processing;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import org.apache.beam.repackaged.core.org.apache.commons.lang3.tuple.Pair;
+import org.apache.beam.runners.dataflow.worker.util.BoundedQueueExecutor;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class StreamingCommitFinalizerTest {
+
+ private StreamingCommitFinalizer finalizer;
+ private BoundedQueueExecutor executor;
+
+ @Before
+ public void setUp() {
+ executor =
+ new BoundedQueueExecutor(
+ 10,
+ 60,
+ TimeUnit.SECONDS,
+ 10,
+ 10000000,
+ new ThreadFactoryBuilder()
+ .setNameFormat("FinalizationCallback-%d")
+ .setDaemon(true)
+ .build(),
+ /*useFairMonitor=*/ false);
+ finalizer = StreamingCommitFinalizer.create(executor);
+ }
+
+ @Test
+ public void testCreateAndInit() {
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCacheCommitFinalizer() {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ assertEquals(1, finalizer.cleanupQueueSize());
+ verify(callback, never()).run();
+ }
+
+ @Test
+ public void testThrowErrorOnDuplicateIds() {
+ Runnable callback1 = mock(Runnable.class);
+ Instant expiry = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(ImmutableMap.of(1L, Pair.of(expiry,
callback1)));
+
+ Runnable callback2 = mock(Runnable.class);
+ Map<Long, Pair<Instant, Runnable>> duplicateCallback =
+ ImmutableMap.of(1L, Pair.of(expiry, callback2));
+ assertThrows(
+ IllegalStateException.class, () ->
finalizer.cacheCommitFinalizers(duplicateCallback));
+ }
+
+ @Test
+ public void testFinalizeCommits() throws Exception {
+ CountDownLatch callbackExecuted = new CountDownLatch(1);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(
+ 1L,
+ Pair.of(
+ Instant.now().plus(Duration.standardHours(1)),
+ () -> callbackExecuted.countDown())));
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callbackExecuted.await(30, TimeUnit.SECONDS));
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testMultipleCommits() throws Exception {
+ CountDownLatch callback1Executed = new CountDownLatch(1);
+ CountDownLatch callback2Executed = new CountDownLatch(1);
+ CountDownLatch callback3Executed = new CountDownLatch(1);
+
+ Instant expiryTime = Instant.now().plus(Duration.standardHours(1));
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+ .put(1L, Pair.of(expiryTime, () -> callback1Executed.countDown()))
+ .put(2L, Pair.of(expiryTime, () -> callback2Executed.countDown()))
+ .put(3L, Pair.of(expiryTime, () -> callback3Executed.countDown()))
+ .build());
+ // Finalize commits one at a time (in different order from added).
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ assertTrue(callback2Executed.await(30, TimeUnit.SECONDS));
+
+ finalizer.finalizeCommits(Collections.singletonList(3L));
+ assertTrue(callback3Executed.await(30, TimeUnit.SECONDS));
+
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callback1Executed.await(30, TimeUnit.SECONDS));
+
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testIgnoresUnknownIds() throws Exception {
+ Runnable callback = mock(Runnable.class);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(1L,
Pair.of(Instant.now().plus(Duration.standardHours(1)), callback)));
+ finalizer.finalizeCommits(Collections.singletonList(2L));
+ assertEquals(1, executor.elementsOutstanding());
+ verify(callback, never()).run();
+ assertEquals(1, finalizer.cleanupQueueSize());
+ }
+
+ @Test
+ public void testCleanupOnExpiration() throws Exception {
+ CountDownLatch callback1Executed = new CountDownLatch(1);
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.of(
+ 1L,
+ Pair.of(
+ Instant.now().plus(Duration.standardHours(1)),
+ () -> callback1Executed.countDown())));
+ assertEquals(1, finalizer.cleanupQueueSize());
+
+ Runnable callback2 = mock(Runnable.class);
+ Runnable callback3 = mock(Runnable.class);
+ Instant shortTimeout = Instant.now().plus(Duration.millis(100));
+ finalizer.cacheCommitFinalizers(
+ ImmutableMap.<Long, Pair<Instant, Runnable>>builder()
+ .put(2L, Pair.of(shortTimeout, callback2))
+ .put(3L, Pair.of(shortTimeout, callback3))
+ .build());
+
+ while (finalizer.cleanupQueueSize() > 1) {
+ // Wait until the two 100ms timeouts expire.
+ Thread.sleep(200);
+ }
+ assertEquals(1, executor.elementsOutstanding());
+ finalizer.finalizeCommits(ImmutableList.of(2L, 3L));
+ verify(callback2, never()).run();
+ verify(callback3, never()).run();
+
+ finalizer.finalizeCommits(Collections.singletonList(1L));
+ assertTrue(callback1Executed.await(30, TimeUnit.SECONDS));
+ assertEquals(0, finalizer.cleanupQueueSize());
+ }
+}
diff --git
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
index 0b179c4d338..ca650907ffe 100644
---
a/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
+++
b/runners/google-cloud-dataflow-java/worker/windmill/src/main/proto/windmill.proto
@@ -447,6 +447,10 @@ message WorkItem {
// elements mapped to a single key to impact pipeline performance. When
// present, this field includes metadata associated with any hot key.
optional HotKeyInfo hot_key_info = 11;
+
+ repeated int64 applied_finalize_ids = 16;
+
+ reserved 12, 13, 14, 15;
}
message ComputationWorkItems {
@@ -653,10 +657,16 @@ message WorkItemCommitRequest {
// Collected work item processing state durations.
repeated LatencyAttribution per_work_item_latency_attributions = 27;
+ // Ids that will be passed back as applied_finalize_ids in a subsequent
+ // GetWorkResponse once the state in this request has been persisted to disk
+ // successfully. This is best-effort; it is possible that state is persisted
+ // but the finalize ids are not sent to the worker.
+ repeated int64 finalize_ids = 19 [packed = true];
+
// DEPRECATED
repeated GlobalDataId global_data_id_requests = 9;
- reserved 6, 19, 23;
+ reserved 6, 23;
}
message ComputationCommitWorkRequest {
@@ -791,11 +801,14 @@ message StreamingGetWorkResponseChunk {
// from other stream_ids may be interleaved on the physical stream.
optional fixed64 stream_id = 4;
+ // Finalize ids associated with successfully applied work from this worker
+ repeated int64 applied_finalize_ids = 6 [packed = true];
+
// Timing infos for the work item. Windmill Dispatcher and user worker should
// propagate critical event timings if the list is not empty.
repeated GetWorkStreamTimingInfo per_work_item_timing_infos = 8;
- // reserved field 5
+ reserved 5, 7;
}
message ComputationWorkItemMetadata {