m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757373330
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -185,49 +202,69 @@ protected final void startStream() {
while (true) {
try {
synchronized (this) {
+ if (isShutdown.get()) {
+ break;
+ }
startTimeMs.set(Instant.now().getMillis());
lastResponseTimeMs.set(0);
streamClosed.set(false);
- // lazily initialize the requestObserver. Gets reset whenever the
stream is reopened.
- requestObserver = requestObserverSupplier.get();
+ requestObserver.reset();
onNewStream();
if (clientClosed.get()) {
halfClose();
}
return;
}
} catch (Exception e) {
- LOG.error("Failed to create new stream, retrying: ", e);
+ logger.error("Failed to create new stream, retrying: ", e);
try {
long sleep = backoff.nextBackOffMillis();
sleepUntil.set(Instant.now().getMillis() + sleep);
- Thread.sleep(sleep);
- } catch (InterruptedException | IOException i) {
+ sleeper.sleep(sleep);
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ break;
Review Comment:
do we want to here? its only called when the streams are created and by the
internal`executor()` when the stream is restarted after
`streamObserver#onClose` or `streamObserver#onError`. We should maybe log an
info/debug message here, keep the interrupt status of the thread and just break
out of the loop.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]