m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757421612


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      if (pendingRequest == null) {
+        // Skip responses when the stream is shutdown since they are now 
invalid.
+        if (!isShutdown()) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+        }
       } else {
         try {
-          done.onDone.accept(
+          pendingRequest.completeWithStatus(
               (i < response.getStatusCount()) ? response.getStatus(i) : 
CommitStatus.OK);
         } catch (RuntimeException e) {
           // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
-          // other commits from being processed.
+          // other commits from being processed. Aggregate all the failures to 
throw after
+          // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
-          finalException = e;
+          if (failure == null) failure = e;

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to