m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757345960


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
 
     @Override
     public void onError(Throwable t) {
-      onStreamFinished(t);
+      if (isStreamDone()) {
+        return;
+      }
+
+      Status status = Status.fromThrowable(t);
+      setLastError(status.toString());
+      // Don't log every error since it will get noisy, and many errors 
transient.
+      if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+        long nowMillis = Instant.now().getMillis();
+        String responseDebug;
+        if (lastResponseTimeMs.get() == 0) {
+          responseDebug = "never received response";
+        } else {
+          responseDebug = "received response " + (nowMillis - 
lastResponseTimeMs.get()) + "ms ago";
+        }
+        LOG.debug(
+            "{} streaming Windmill RPC errors for {}, last was: {} with status 
{}."
+                + " created {}ms ago, {}. This is normal with autoscaling.",
+            AbstractWindmillStream.this.getClass(),
+            errorCount.get(),
+            t,
+            status,
+            nowMillis - startTimeMs.get(),
+            responseDebug);
+      }
+
+      // If the stream was stopped due to a resource exhausted error then we 
are throttled.
+      if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+        startThrottleTimer();
+      }
+
+      try {
+        long sleep = backoff.nextBackOffMillis();
+        sleepUntil.set(Instant.now().getMillis() + sleep);
+        sleeper.sleep(sleep);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      } catch (IOException e) {
+        // Ignore.
+      }
+
+      tryRestartStream();
     }
 
     @Override
     public void onCompleted() {
-      onStreamFinished(null);
+      if (isStreamDone()) {
+        return;
+      }
+      errorCount.incrementAndGet();
+      String error =
+          "Stream completed successfully but did not complete requested 
operations, "
+              + "recreating";
+      LOG.warn(error);
+      setLastError(error);
+      tryRestartStream();
     }
 
-    private void onStreamFinished(@Nullable Throwable t) {
-      synchronized (this) {
-        if (clientClosed.get() && !hasPendingRequests()) {
-          streamRegistry.remove(AbstractWindmillStream.this);
-          finishLatch.countDown();
-          return;
-        }
+    private void tryRestartStream() {
+      if (!isShutdown()) {
+        executeSafely(AbstractWindmillStream.this::startStream);
       }
-      if (t != null) {
-        Status status = null;
-        if (t instanceof StatusRuntimeException) {
-          status = ((StatusRuntimeException) t).getStatus();
-        }
-        String statusError = status == null ? "" : status.toString();
-        setLastError(statusError);
-        if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
-          long nowMillis = Instant.now().getMillis();
-          String responseDebug;
-          if (lastResponseTimeMs.get() == 0) {
-            responseDebug = "never received response";
-          } else {
-            responseDebug =
-                "received response " + (nowMillis - lastResponseTimeMs.get()) 
+ "ms ago";
-          }
-          LOG.debug(
-              "{} streaming Windmill RPC errors for {}, last was: {} with 
status {}."
-                  + " created {}ms ago, {}. This is normal with autoscaling.",
-              AbstractWindmillStream.this.getClass(),
-              errorCount.get(),
-              t,
-              statusError,
-              nowMillis - startTimeMs.get(),
-              responseDebug);
-        }
-        // If the stream was stopped due to a resource exhausted error then we 
are throttled.
-        if (status != null && status.getCode() == 
Status.Code.RESOURCE_EXHAUSTED) {
-          startThrottleTimer();
-        }
+    }
 
-        try {
-          long sleep = backoff.nextBackOffMillis();
-          sleepUntil.set(Instant.now().getMillis() + sleep);
-          Thread.sleep(sleep);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        } catch (IOException e) {
-          // Ignore.
-        }
-      } else {
-        errorCount.incrementAndGet();
-        String error =
-            "Stream completed successfully but did not complete requested 
operations, "
-                + "recreating";
-        LOG.warn(error);
-        setLastError(error);
+    private synchronized boolean isStreamDone() {
+      if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) {
+        streamRegistry.remove(AbstractWindmillStream.this);

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to