This is an automated email from the ASF dual-hosted git repository.

goenka pushed a commit to branch release-2.32.0
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/release-2.32.0 by this push:
     new 8e32f3b  [BEAM-12661] Fix stuck GetData Windmill calls (#15224) 
(#15226)
8e32f3b is described below

commit 8e32f3b9a67dc5df63069b6f0271c97158cc9639
Author: Ankur <angoe...@users.noreply.github.com>
AuthorDate: Wed Jul 28 13:01:53 2021 -0700

    [BEAM-12661] Fix stuck GetData Windmill calls (#15224) (#15226)
    
    [BEAM-12661] Fix stuck GetData Windmill calls
    
    Co-authored-by: slavachernyak <chern...@google.com>
---
 .../beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java  | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
index 81ae092..3c06ee9 100644
--- 
a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
+++ 
b/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/GrpcWindmillServer.java
@@ -1525,8 +1525,8 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
 
                 try {
                   blockedStartMs.set(Instant.now().getMillis());
-                  current = queue.take();
-                  if (current != POISON_PILL) {
+                  current = queue.poll(180, TimeUnit.SECONDS);
+                  if (current != null && current != POISON_PILL) {
                     return true;
                   }
                   if (cancelled.get()) {
@@ -1535,7 +1535,8 @@ public class GrpcWindmillServer extends 
WindmillServerStub {
                   if (complete.get()) {
                     return false;
                   }
-                  throw new IllegalStateException("Got poison pill but stream 
is not done.");
+                  throw new IllegalStateException(
+                      "Got poison pill or timeout but stream is not done.");
                 } catch (InterruptedException e) {
                   Thread.currentThread().interrupt();
                   throw new CancellationException();

Reply via email to