m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757438912


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,44 @@ public void sendHealthCheck() {
   protected void onResponse(StreamingCommitResponse response) {
     commitWorkThrottleTimer.stop();
 
-    RuntimeException finalException = null;
+    @Nullable RuntimeException failure = null;
     for (int i = 0; i < response.getRequestIdCount(); ++i) {
       long requestId = response.getRequestId(i);
       if (requestId == HEARTBEAT_REQUEST_ID) {
         continue;
       }
-      PendingRequest done = pending.remove(requestId);
-      if (done == null) {
-        LOG.error("Got unknown commit request ID: {}", requestId);
+      PendingRequest pendingRequest = pending.remove(requestId);
+      if (pendingRequest == null) {
+        // Skip responses when the stream is shutdown since they are now 
invalid.
+        if (!isShutdown()) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+        }
       } else {
         try {
-          done.onDone.accept(
+          pendingRequest.completeWithStatus(
               (i < response.getStatusCount()) ? response.getStatus(i) : 
CommitStatus.OK);
         } catch (RuntimeException e) {
           // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
-          // other commits from being processed.
+          // other commits from being processed. Aggregate all the failures to 
throw after
+          // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
-          finalException = e;
+          if (failure == null) failure = e;
+          else failure.addSuppressed(e);
         }
       }
     }
-    if (finalException != null) {
-      throw finalException;
+    if (failure != null) {
+      throw failure;
+    }
+  }
+
+  @Override
+  protected void shutdownInternal() {
+    Iterator<PendingRequest> pendingRequests = pending.values().iterator();
+    while (pendingRequests.hasNext()) {
+      PendingRequest pendingRequest = pendingRequests.next();

Review Comment:
   if we call complete with status twice i think it is a no-op since it 
completion is synchronized and it will just log that we couldn't complete the 
work item since it doesn't exist.
   
   is it a concern that we call complete twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to