This is an automated email from the ASF dual-hosted git repository. goenka pushed a commit to branch release-2.32.0 in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/release-2.32.0 by this push: new 8e32f3b [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226) 8e32f3b is described below commit 8e32f3b9a67dc5df63069b6f0271c97158cc9639 Author: Ankur <angoe...@users.noreply.github.com> AuthorDate: Wed Jul 28 13:01:53 2021 -0700 [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226) [BEAM-12661] Fix stuck GetData Windmill calls Co-authored-by: slavachernyak <chern...@google.com> --- .../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java index 81ae092..3c06ee9 100644 --- a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java +++ b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java @@ -1525,8 +1525,8 @@ public class GrpcWindmillServer extends WindmillServerStub { try { blockedStartMs.set(Instant.now().getMillis()); - current = queue.take(); - if (current != POISON_PILL) { + current = queue.poll(180, TimeUnit.SECONDS); + if (current != null && current != POISON_PILL) { return true; } if (cancelled.get()) { @@ -1535,7 +1535,8 @@ public class GrpcWindmillServer extends WindmillServerStub { if (complete.get()) { return false; } - throw new IllegalStateException("Got poison pill but stream is not done."); + throw new IllegalStateException( + "Got poison pill or timeout but stream is not done."); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new CancellationException();