This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch revert-30215-fail-commit in repository https://gitbox.apache.org/repos/asf/beam.git
commit 55a5b6b84f4fdcd5363c60296c335a8ed27361ce Author: Sam Whittle <scwhit...@users.noreply.github.com> AuthorDate: Tue Feb 6 11:23:28 2024 +0100 Revert "When failing work items during commit, make sure to call completeWork…" This reverts commit b0f2eebb0244302ac2315dc260536512d229401f. --- .../runners/dataflow/worker/StreamingDataflowWorker.java | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index b48032677ff..3ba27bd852f 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1397,20 +1397,12 @@ public class StreamingDataflowWorker { // Adds the commit to the commitStream if it fits, returning true iff it is consumed. private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) { Preconditions.checkNotNull(commit); - final ComputationState state = commit.computationState(); - final Windmill.WorkItemCommitRequest request = commit.request(); // Drop commits for failed work. Such commits will be dropped by Windmill anyway. if (commit.work().isFailed()) { - readerCache.invalidateReader( - WindmillComputationKey.create( - state.getComputationId(), request.getKey(), request.getShardingKey())); - stateCache - .forComputation(state.getComputationId()) - .invalidate(request.getKey(), request.getShardingKey()); - state.completeWorkAndScheduleNextWorkForKey( - ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); return true; } + final ComputationState state = commit.computationState(); + final Windmill.WorkItemCommitRequest request = commit.request(); final int size = commit.getSize(); commit.work().setState(Work.State.COMMITTING); activeCommitBytes.addAndGet(size);