m-trieu commented on code in PR #31902: URL: https://github.com/apache/beam/pull/31902#discussion_r1757421612
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -156,29 +162,44 @@ public void sendHealthCheck() { protected void onResponse(StreamingCommitResponse response) { commitWorkThrottleTimer.stop(); - RuntimeException finalException = null; + @Nullable RuntimeException failure = null; for (int i = 0; i < response.getRequestIdCount(); ++i) { long requestId = response.getRequestId(i); if (requestId == HEARTBEAT_REQUEST_ID) { continue; } - PendingRequest done = pending.remove(requestId); - if (done == null) { - LOG.error("Got unknown commit request ID: {}", requestId); + PendingRequest pendingRequest = pending.remove(requestId); + if (pendingRequest == null) { + // Skip responses when the stream is shutdown since they are now invalid. + if (!isShutdown()) { + LOG.error("Got unknown commit request ID: {}", requestId); + } } else { try { - done.onDone.accept( + pendingRequest.completeWithStatus( (i < response.getStatusCount()) ? response.getStatus(i) : CommitStatus.OK); } catch (RuntimeException e) { // Catch possible exceptions to ensure that an exception for one commit does not prevent - // other commits from being processed. + // other commits from being processed. Aggregate all the failures to throw after + // processing the response if they exist. LOG.warn("Exception while processing commit response.", e); - finalException = e; + if (failure == null) failure = e; Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org