m-trieu commented on code in PR #31902: URL: https://github.com/apache/beam/pull/31902#discussion_r1757345960
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -303,71 +368,78 @@ public void onNext(ResponseT response) { @Override public void onError(Throwable t) { - onStreamFinished(t); + if (isStreamDone()) { + return; + } + + Status status = Status.fromThrowable(t); + setLastError(status.toString()); + // Don't log every error since it will get noisy, and many errors transient. + if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) { + long nowMillis = Instant.now().getMillis(); + String responseDebug; + if (lastResponseTimeMs.get() == 0) { + responseDebug = "never received response"; + } else { + responseDebug = "received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago"; + } + LOG.debug( + "{} streaming Windmill RPC errors for {}, last was: {} with status {}." + + " created {}ms ago, {}. This is normal with autoscaling.", + AbstractWindmillStream.this.getClass(), + errorCount.get(), + t, + status, + nowMillis - startTimeMs.get(), + responseDebug); + } + + // If the stream was stopped due to a resource exhausted error then we are throttled. + if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { + startThrottleTimer(); + } + + try { + long sleep = backoff.nextBackOffMillis(); + sleepUntil.set(Instant.now().getMillis() + sleep); + sleeper.sleep(sleep); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } catch (IOException e) { + // Ignore. + } + + tryRestartStream(); } @Override public void onCompleted() { - onStreamFinished(null); + if (isStreamDone()) { + return; + } + errorCount.incrementAndGet(); + String error = + "Stream completed successfully but did not complete requested operations, " + + "recreating"; + LOG.warn(error); + setLastError(error); + tryRestartStream(); } - private void onStreamFinished(@Nullable Throwable t) { - synchronized (this) { - if (clientClosed.get() && !hasPendingRequests()) { - streamRegistry.remove(AbstractWindmillStream.this); - finishLatch.countDown(); - return; - } + private void tryRestartStream() { + if (!isShutdown()) { + executeSafely(AbstractWindmillStream.this::startStream); } - if (t != null) { - Status status = null; - if (t instanceof StatusRuntimeException) { - status = ((StatusRuntimeException) t).getStatus(); - } - String statusError = status == null ? "" : status.toString(); - setLastError(statusError); - if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) { - long nowMillis = Instant.now().getMillis(); - String responseDebug; - if (lastResponseTimeMs.get() == 0) { - responseDebug = "never received response"; - } else { - responseDebug = - "received response " + (nowMillis - lastResponseTimeMs.get()) + "ms ago"; - } - LOG.debug( - "{} streaming Windmill RPC errors for {}, last was: {} with status {}." - + " created {}ms ago, {}. This is normal with autoscaling.", - AbstractWindmillStream.this.getClass(), - errorCount.get(), - t, - statusError, - nowMillis - startTimeMs.get(), - responseDebug); - } - // If the stream was stopped due to a resource exhausted error then we are throttled. - if (status != null && status.getCode() == Status.Code.RESOURCE_EXHAUSTED) { - startThrottleTimer(); - } + } - try { - long sleep = backoff.nextBackOffMillis(); - sleepUntil.set(Instant.now().getMillis() + sleep); - Thread.sleep(sleep); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } catch (IOException e) { - // Ignore. - } - } else { - errorCount.incrementAndGet(); - String error = - "Stream completed successfully but did not complete requested operations, " - + "recreating"; - LOG.warn(error); - setLastError(error); + private synchronized boolean isStreamDone() { + if (isShutdown() || (clientClosed.get() && !hasPendingRequests())) { + streamRegistry.remove(AbstractWindmillStream.this); Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org