This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch revert-30215-fail-commit
in repository https://gitbox.apache.org/repos/asf/beam.git

commit 55a5b6b84f4fdcd5363c60296c335a8ed27361ce
Author: Sam Whittle <scwhit...@users.noreply.github.com>
AuthorDate: Tue Feb 6 11:23:28 2024 +0100

    Revert "When failing work items during commit, make sure to call 
completeWork…"
    
    This reverts commit b0f2eebb0244302ac2315dc260536512d229401f.
---
 .../runners/dataflow/worker/StreamingDataflowWorker.java     | 12 ++----------
 1 file changed, 2 insertions(+), 10 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index b48032677ff..3ba27bd852f 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,20 +1397,12 @@ public class StreamingDataflowWorker {
   // Adds the commit to the commitStream if it fits, returning true iff it is 
consumed.
   private boolean addCommitToStream(Commit commit, CommitWorkStream 
commitStream) {
     Preconditions.checkNotNull(commit);
-    final ComputationState state = commit.computationState();
-    final Windmill.WorkItemCommitRequest request = commit.request();
     // Drop commits for failed work. Such commits will be dropped by Windmill 
anyway.
     if (commit.work().isFailed()) {
-      readerCache.invalidateReader(
-          WindmillComputationKey.create(
-              state.getComputationId(), request.getKey(), 
request.getShardingKey()));
-      stateCache
-          .forComputation(state.getComputationId())
-          .invalidate(request.getKey(), request.getShardingKey());
-      state.completeWorkAndScheduleNextWorkForKey(
-          ShardedKey.create(request.getKey(), request.getShardingKey()), 
request.getWorkToken());
       return true;
     }
+    final ComputationState state = commit.computationState();
+    final Windmill.WorkItemCommitRequest request = commit.request();
     final int size = commit.getSize();
     commit.work().setState(Work.State.COMMITTING);
     activeCommitBytes.addAndGet(size);

Reply via email to