arunpandianp commented on code in PR #35327: URL: https://github.com/apache/beam/pull/35327#discussion_r2160964856
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -139,62 +152,83 @@ public CommitWorkStream.RequestBatcher batcher() { } @Override - protected boolean hasPendingRequests() { - return !pending.isEmpty(); - } - - @Override - protected void sendHealthCheck() throws WindmillStreamShutdownException { - if (hasPendingRequests()) { + protected synchronized void sendHealthCheck() throws WindmillStreamShutdownException { + if (currentPhysicalStream.hasPendingRequests()) { StreamingCommitWorkRequest.Builder builder = StreamingCommitWorkRequest.newBuilder(); builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID); trySend(builder.build()); } } - @Override - protected void onResponse(StreamingCommitResponse response) { - CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler(); - for (int i = 0; i < response.getRequestIdCount(); ++i) { - long requestId = response.getRequestId(i); - if (requestId == HEARTBEAT_REQUEST_ID) { - continue; - } + private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler { + @Override + public void onResponse(StreamingCommitResponse response) { + CommitCompletionFailureHandler failureHandler = new CommitCompletionFailureHandler(); + for (int i = 0; i < response.getRequestIdCount(); ++i) { + long requestId = response.getRequestId(i); + if (requestId == HEARTBEAT_REQUEST_ID) { + continue; + } + + // From windmill.proto: Indices must line up with the request_id field, but trailing OKs + // may be omitted. + CommitStatus commitStatus = + i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; - // From windmill.proto: Indices must line up with the request_id field, but trailing OKs may - // be omitted. - CommitStatus commitStatus = - i < response.getStatusCount() ? response.getStatus(i) : CommitStatus.OK; - - @Nullable PendingRequest pendingRequest = pending.remove(requestId); - if (pendingRequest == null) { - synchronized (this) { - if (!isShutdown) { - // Missing responses are expected after shutdown() because it removes them. - LOG.error("Got unknown commit request ID: {}", requestId); - } + @Nullable + KV<CommitWorkPhysicalStreamHandler, PendingRequest> entry = pending.remove(requestId); + if (entry == null) { + LOG.error("Got unknown commit request ID: {}", requestId); + continue; } - } else { + if (entry.getKey() != this) { + LOG.error("Got commit request id {} on unexpected stream", requestId); + } + PendingRequest pendingRequest = entry.getValue(); try { pendingRequest.completeWithStatus(commitStatus); } catch (RuntimeException e) { - // Catch possible exceptions to ensure that an exception for one commit does not prevent + // Catch possible exceptions to ensure that an exception for one commit does not + // prevent // other commits from being processed. Aggregate all the failures to throw after // processing the response if they exist. LOG.warn("Exception while processing commit response.", e); failureHandler.addError(commitStatus, e); } } + + failureHandler.throwIfNonEmpty(); + } + + @Override + public boolean hasPendingRequests() { + return pending.entrySet().stream().anyMatch(e -> e.getValue().getKey() == this); + } + + @Override + public void onDone(Status status) { + if (status.isOk() && hasPendingRequests()) { + LOG.warn("Unexpected requests without responses on drained physical stream, retrying."); + } } - failureHandler.throwIfNonEmpty(); + @Override + public void appendHtml(PrintWriter writer) { + writer.format("CommitWorkStream: %d pending", pending.size()); Review Comment: should we count only entries matching `e.getValue().getKey() == this` here? ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -68,10 +76,21 @@ final class GrpcGetDataStream private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST = StreamingGetDataRequest.newBuilder().build(); - /** @implNote {@link QueuedBatch} objects in the queue are is guarded by {@code this} */ + static final FluentBackoff BACK_OFF_FACTORY = + FluentBackoff.DEFAULT + .withInitialBackoff(Duration.millis(10)) + .withMaxBackoff(Duration.standardSeconds(10)); + + /** + * @implNote {@link QueuedBatch} objects in the queue should also be guarded by {@code this}. + * Batches should be sent from the front of the queue and only removed from the queue once + * added to the pending set of a physical stream. + */ + @GuardedBy("this") private final Deque<QueuedBatch> batches; - private final Map<Long, AppendableInputStream> pending; + private final Supplier<Integer> batchesDebugSizeSupplier; Review Comment: thoughts on replacing this with the following? ` private final ConcurrentLinkedDeque<QueuedBatch> batches; @SuppressWarnings("GuardedBy") private int getBatchesSize() { batches.size(); }` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -342,12 +419,15 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest request, ParseFn<Respon } } catch (IOException e) { LOG.error("Parsing GetData response failed: ", e); + try { + BackOffUtils.next(Sleeper.DEFAULT, backoff); + } catch (IOException | InterruptedException ie) { + Thread.currentThread().interrupt(); Review Comment: why interrupt on IOException? I'm also not sure why `BackOffUtils.next` has a throws IOException, it doesn't look like any implementation throws. ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java: ########## @@ -57,7 +60,9 @@ final class GrpcCommitWorkStream private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE; - private final ConcurrentMap<Long, PendingRequest> pending; + private final ConcurrentMap<Long, KV<CommitWorkPhysicalStreamHandler, PendingRequest>> pending = Review Comment: It looks like we move all pending requests to the new steam on onNewStream, it is not clear to me if there'll be multiple CommitWorkPhysicalStreamHandlers be in use at the same time. Will it change in the following PRs? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org