m-trieu commented on code in PR #31902:
URL: https://github.com/apache/beam/pull/31902#discussion_r1757456193
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -156,29 +162,44 @@ public void sendHealthCheck() {
protected void onResponse(StreamingCommitResponse response) {
commitWorkThrottleTimer.stop();
- RuntimeException finalException = null;
+ @Nullable RuntimeException failure = null;
for (int i = 0; i < response.getRequestIdCount(); ++i) {
long requestId = response.getRequestId(i);
if (requestId == HEARTBEAT_REQUEST_ID) {
continue;
}
- PendingRequest done = pending.remove(requestId);
- if (done == null) {
- LOG.error("Got unknown commit request ID: {}", requestId);
+ PendingRequest pendingRequest = pending.remove(requestId);
+ if (pendingRequest == null) {
+ // Skip responses when the stream is shutdown since they are now
invalid.
+ if (!isShutdown()) {
+ LOG.error("Got unknown commit request ID: {}", requestId);
+ }
} else {
try {
- done.onDone.accept(
+ pendingRequest.completeWithStatus(
(i < response.getStatusCount()) ? response.getStatus(i) :
CommitStatus.OK);
} catch (RuntimeException e) {
// Catch possible exceptions to ensure that an exception for one
commit does not prevent
- // other commits from being processed.
+ // other commits from being processed. Aggregate all the failures to
throw after
+ // processing the response if they exist.
LOG.warn("Exception while processing commit response.", e);
- finalException = e;
+ if (failure == null) failure = e;
+ else failure.addSuppressed(e);
}
}
}
- if (finalException != null) {
- throw finalException;
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ @Override
+ protected void shutdownInternal() {
+ Iterator<PendingRequest> pendingRequests = pending.values().iterator();
+ while (pendingRequests.hasNext()) {
+ PendingRequest pendingRequest = pendingRequests.next();
Review Comment:
```
private synchronized void removeCompletedWorkFromQueue(
Queue<ExecutableWork> workQueue, ShardedKey shardedKey, WorkId workId)
{
// avoid Preconditions.checkState here to prevent eagerly evaluating the
// format string parameters for the error message.
ExecutableWork completedWork = workQueue.peek();
if (completedWork == null) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn("Active key {} without work, expected token {}", shardedKey,
workId);
return;
}
if (!completedWork.id().equals(workId)) {
// Work may have been completed due to clearing of stuck commits.
LOG.warn(
"Unable to complete due to token mismatch for "
+ "key {},"
+ "expected work_id {}, "
+ "actual work_id was {}",
shardedKey,
workId,
completedWork.id());
return;
}
// We consumed the matching work item.
workQueue.remove();
decrementActiveWorkBudget(completedWork.work());
}
```
this is the completion logic for removing from the queue which is the main
logic that prevents stuckness when we drain the commits
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]