This is an automated email from the ASF dual-hosted git repository. scwhittle pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 5494f114382 [Dataflow Streaming] Invalidate caches and remove work on failure before commit (#30229) 5494f114382 is described below commit 5494f1143827e0e6fec9e331b93c00c83d10c66e Author: Arun Pandian <arunpandi...@gmail.com> AuthorDate: Tue Feb 6 03:34:48 2024 -0800 [Dataflow Streaming] Invalidate caches and remove work on failure before commit (#30229) * Invalidate caches and remove work on failure before commit * Prevent completeWorkAndScheduleNextWorkForKey from throwing --------- Co-authored-by: Arun Pandian <pandi...@google.com> --- .../dataflow/worker/StreamingDataflowWorker.java | 15 +++++++++++---- .../dataflow/worker/streaming/ActiveWorkState.java | 17 +++++++---------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java index 3ba27bd852f..14efdcc5eb0 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingDataflowWorker.java @@ -1397,12 +1397,21 @@ public class StreamingDataflowWorker { // Adds the commit to the commitStream if it fits, returning true iff it is consumed. private boolean addCommitToStream(Commit commit, CommitWorkStream commitStream) { Preconditions.checkNotNull(commit); + final ComputationState state = commit.computationState(); + final Windmill.WorkItemCommitRequest request = commit.request(); // Drop commits for failed work. Such commits will be dropped by Windmill anyway. if (commit.work().isFailed()) { + readerCache.invalidateReader( + WindmillComputationKey.create( + state.getComputationId(), request.getKey(), request.getShardingKey())); + stateCache + .forComputation(state.getComputationId()) + .invalidate(request.getKey(), request.getShardingKey()); + state.completeWorkAndScheduleNextWorkForKey( + ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); return true; } - final ComputationState state = commit.computationState(); - final Windmill.WorkItemCommitRequest request = commit.request(); + final int size = commit.getSize(); commit.work().setState(Work.State.COMMITTING); activeCommitBytes.addAndGet(size); @@ -1419,8 +1428,6 @@ public class StreamingDataflowWorker { .invalidate(request.getKey(), request.getShardingKey()); } activeCommitBytes.addAndGet(-size); - // This may throw an exception if the commit was not active, which is possible if it - // was deemed stuck. state.completeWorkAndScheduleNextWorkForKey( ShardedKey.create(request.getKey(), request.getShardingKey()), request.getWorkToken()); diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java index 54942dfeee1..ff46356d956 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/streaming/ActiveWorkState.java @@ -188,16 +188,13 @@ public final class ActiveWorkState { private synchronized void removeCompletedWorkFromQueue( Queue<Work> workQueue, ShardedKey shardedKey, long workToken) { - // avoid Preconditions.checkState here to prevent eagerly evaluating the - // format string parameters for the error message. - Work completedWork = - Optional.ofNullable(workQueue.peek()) - .orElseThrow( - () -> - new IllegalStateException( - String.format( - "Active key %s without work, expected token %d", - shardedKey, workToken))); + Work completedWork = workQueue.peek(); + if (completedWork == null) { + // Work may have been completed due to clearing of stuck commits. + LOG.warn( + String.format("Active key %s without work, expected token %d", shardedKey, workToken)); + return; + } if (completedWork.getWorkItem().getWorkToken() != workToken) { // Work may have been completed due to clearing of stuck commits.