m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1730013592
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -303,71 +368,78 @@ public void onNext(ResponseT response) {
@Override
public void onError(Throwable t) {
- onStreamFinished(t);
+ if (isStreamDone()) {
+ return;
+ }
+
+ Status status = Status.fromThrowable(t);
+ setLastError(status.toString());
+ // Don't log every error since it will get noisy, and many errors
transient.
+ if (errorCount.getAndIncrement() % logEveryNStreamFailures == 0) {
+ long nowMillis = Instant.now().getMillis();
+ String responseDebug;
+ if (lastResponseTimeMs.get() == 0) {
+ responseDebug = "never received response";
+ } else {
+ responseDebug = "received response " + (nowMillis -
lastResponseTimeMs.get()) + "ms ago";
+ }
+ LOG.debug(
+ "{} streaming Windmill RPC errors for {}, last was: {} with status
{}."
+ + " created {}ms ago, {}. This is normal with autoscaling.",
+ AbstractWindmillStream.this.getClass(),
+ errorCount.get(),
+ t,
+ status,
+ nowMillis - startTimeMs.get(),
+ responseDebug);
+ }
+
+ // If the stream was stopped due to a resource exhausted error then we
are throttled.
+ if (status.getCode() == Status.Code.RESOURCE_EXHAUSTED) {
+ startThrottleTimer();
+ }
+
+ try {
+ long sleep = backoff.nextBackOffMillis();
+ sleepUntil.set(Instant.now().getMillis() + sleep);
+ sleeper.sleep(sleep);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ } catch (IOException e) {
+ // Ignore.
+ }
+
+ tryRestartStream();
}
@Override
public void onCompleted() {
- onStreamFinished(null);
+ if (isStreamDone()) {
+ return;
+ }
+ errorCount.incrementAndGet();
+ String error =
+ "Stream completed successfully but did not complete requested
operations, "
+ + "recreating";
+ LOG.warn(error);
+ setLastError(error);
+ tryRestartStream();
}
- private void onStreamFinished(@Nullable Throwable t) {
- synchronized (this) {
- if (clientClosed.get() && !hasPendingRequests()) {
- streamRegistry.remove(AbstractWindmillStream.this);
- finishLatch.countDown();
- return;
- }
+ private void tryRestartStream() {
+ if (!isShutdown()) {
Review Comment:
done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]