This is an automated email from the ASF dual-hosted git repository.

scwhittle pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 5494f114382 [Dataflow Streaming] Invalidate caches and remove work on 
failure before commit (#30229)
5494f114382 is described below

commit 5494f1143827e0e6fec9e331b93c00c83d10c66e
Author: Arun Pandian <arunpandi...@gmail.com>
AuthorDate: Tue Feb 6 03:34:48 2024 -0800

    [Dataflow Streaming] Invalidate caches and remove work on failure before 
commit (#30229)
    
    * Invalidate caches and remove work on failure before commit
    * Prevent completeWorkAndScheduleNextWorkForKey from throwing
    
    ---------
    
    Co-authored-by: Arun Pandian <pandi...@google.com>
---
 .../dataflow/worker/StreamingDataflowWorker.java        | 15 +++++++++++----
 .../dataflow/worker/streaming/ActiveWorkState.java      | 17 +++++++----------
 2 files changed, 18 insertions(+), 14 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
index 3ba27bd852f..14efdcc5eb0 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java
@@ -1397,12 +1397,21 @@ public class StreamingDataflowWorker {
   // Adds the commit to the commitStream if it fits, returning true iff it is 
consumed.
   private boolean addCommitToStream(Commit commit, CommitWorkStream 
commitStream) {
     Preconditions.checkNotNull(commit);
+    final ComputationState state = commit.computationState();
+    final Windmill.WorkItemCommitRequest request = commit.request();
     // Drop commits for failed work. Such commits will be dropped by Windmill 
anyway.
     if (commit.work().isFailed()) {
+      readerCache.invalidateReader(
+          WindmillComputationKey.create(
+              state.getComputationId(), request.getKey(), 
request.getShardingKey()));
+      stateCache
+          .forComputation(state.getComputationId())
+          .invalidate(request.getKey(), request.getShardingKey());
+      state.completeWorkAndScheduleNextWorkForKey(
+          ShardedKey.create(request.getKey(), request.getShardingKey()), 
request.getWorkToken());
       return true;
     }
-    final ComputationState state = commit.computationState();
-    final Windmill.WorkItemCommitRequest request = commit.request();
+
     final int size = commit.getSize();
     commit.work().setState(Work.State.COMMITTING);
     activeCommitBytes.addAndGet(size);
@@ -1419,8 +1428,6 @@ public class StreamingDataflowWorker {
                 .invalidate(request.getKey(), request.getShardingKey());
           }
           activeCommitBytes.addAndGet(-size);
-          // This may throw an exception if the commit was not active, which 
is possible if it
-          // was deemed stuck.
           state.completeWorkAndScheduleNextWorkForKey(
               ShardedKey.create(request.getKey(), request.getShardingKey()),
               request.getWorkToken());
diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
index 54942dfeee1..ff46356d956 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java
@@ -188,16 +188,13 @@ public final class ActiveWorkState {
 
   private synchronized void removeCompletedWorkFromQueue(
       Queue<Work> workQueue, ShardedKey shardedKey, long workToken) {
-    // avoid Preconditions.checkState here to prevent eagerly evaluating the
-    // format string parameters for the error message.
-    Work completedWork =
-        Optional.ofNullable(workQueue.peek())
-            .orElseThrow(
-                () ->
-                    new IllegalStateException(
-                        String.format(
-                            "Active key %s without work, expected token %d",
-                            shardedKey, workToken)));
+    Work completedWork = workQueue.peek();
+    if (completedWork == null) {
+      // Work may have been completed due to clearing of stuck commits.
+      LOG.warn(
+          String.format("Active key %s without work, expected token %d", 
shardedKey, workToken));
+      return;
+    }
 
     if (completedWork.getWorkItem().getWorkToken() != workToken) {
       // Work may have been completed due to clearing of stuck commits.

Reply via email to