arunpandianp commented on code in PR #35327:
URL: https://github.com/apache/beam/pull/35327#discussion_r2160964856


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -139,62 +152,83 @@ public CommitWorkStream.RequestBatcher batcher() {
   }
 
   @Override
-  protected boolean hasPendingRequests() {
-    return !pending.isEmpty();
-  }
-
-  @Override
-  protected void sendHealthCheck() throws WindmillStreamShutdownException {
-    if (hasPendingRequests()) {
+  protected synchronized void sendHealthCheck() throws 
WindmillStreamShutdownException {
+    if (currentPhysicalStream.hasPendingRequests()) {
       StreamingCommitWorkRequest.Builder builder = 
StreamingCommitWorkRequest.newBuilder();
       builder.addCommitChunkBuilder().setRequestId(HEARTBEAT_REQUEST_ID);
       trySend(builder.build());
     }
   }
 
-  @Override
-  protected void onResponse(StreamingCommitResponse response) {
-    CommitCompletionFailureHandler failureHandler = new 
CommitCompletionFailureHandler();
-    for (int i = 0; i < response.getRequestIdCount(); ++i) {
-      long requestId = response.getRequestId(i);
-      if (requestId == HEARTBEAT_REQUEST_ID) {
-        continue;
-      }
+  private class CommitWorkPhysicalStreamHandler extends PhysicalStreamHandler {
+    @Override
+    public void onResponse(StreamingCommitResponse response) {
+      CommitCompletionFailureHandler failureHandler = new 
CommitCompletionFailureHandler();
+      for (int i = 0; i < response.getRequestIdCount(); ++i) {
+        long requestId = response.getRequestId(i);
+        if (requestId == HEARTBEAT_REQUEST_ID) {
+          continue;
+        }
+
+        // From windmill.proto: Indices must line up with the request_id 
field, but trailing OKs
+        // may be omitted.
+        CommitStatus commitStatus =
+            i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
 
-      // From windmill.proto: Indices must line up with the request_id field, 
but trailing OKs may
-      // be omitted.
-      CommitStatus commitStatus =
-          i < response.getStatusCount() ? response.getStatus(i) : 
CommitStatus.OK;
-
-      @Nullable PendingRequest pendingRequest = pending.remove(requestId);
-      if (pendingRequest == null) {
-        synchronized (this) {
-          if (!isShutdown) {
-            // Missing responses are expected after shutdown() because it 
removes them.
-            LOG.error("Got unknown commit request ID: {}", requestId);
-          }
+        @Nullable
+        KV<CommitWorkPhysicalStreamHandler, PendingRequest> entry = 
pending.remove(requestId);
+        if (entry == null) {
+          LOG.error("Got unknown commit request ID: {}", requestId);
+          continue;
         }
-      } else {
+        if (entry.getKey() != this) {
+          LOG.error("Got commit request id {} on unexpected stream", 
requestId);
+        }
+        PendingRequest pendingRequest = entry.getValue();
         try {
           pendingRequest.completeWithStatus(commitStatus);
         } catch (RuntimeException e) {
-          // Catch possible exceptions to ensure that an exception for one 
commit does not prevent
+          // Catch possible exceptions to ensure that an exception for one 
commit does not
+          // prevent
           // other commits from being processed. Aggregate all the failures to 
throw after
           // processing the response if they exist.
           LOG.warn("Exception while processing commit response.", e);
           failureHandler.addError(commitStatus, e);
         }
       }
+
+      failureHandler.throwIfNonEmpty();
+    }
+
+    @Override
+    public boolean hasPendingRequests() {
+      return pending.entrySet().stream().anyMatch(e -> e.getValue().getKey() 
== this);
+    }
+
+    @Override
+    public void onDone(Status status) {
+      if (status.isOk() && hasPendingRequests()) {
+        LOG.warn("Unexpected requests without responses on drained physical 
stream, retrying.");
+      }
     }
 
-    failureHandler.throwIfNonEmpty();
+    @Override
+    public void appendHtml(PrintWriter writer) {
+      writer.format("CommitWorkStream: %d pending", pending.size());

Review Comment:
   should we count only entries matching `e.getValue().getKey() == this` here? 



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -68,10 +76,21 @@ final class GrpcGetDataStream
   private static final StreamingGetDataRequest HEALTH_CHECK_REQUEST =
       StreamingGetDataRequest.newBuilder().build();
 
-  /** @implNote {@link QueuedBatch} objects in the queue are is guarded by 
{@code this} */
+  static final FluentBackoff BACK_OFF_FACTORY =
+      FluentBackoff.DEFAULT
+          .withInitialBackoff(Duration.millis(10))
+          .withMaxBackoff(Duration.standardSeconds(10));
+
+  /**
+   * @implNote {@link QueuedBatch} objects in the queue should also be guarded 
by {@code this}.
+   *     Batches should be sent from the front of the queue and only removed 
from the queue once
+   *     added to the pending set of a physical stream.
+   */
+  @GuardedBy("this")
   private final Deque<QueuedBatch> batches;
 
-  private final Map<Long, AppendableInputStream> pending;
+  private final Supplier<Integer> batchesDebugSizeSupplier;

Review Comment:
   thoughts on replacing this with the following?
   
   `
    private final ConcurrentLinkedDeque<QueuedBatch> batches;
   
   @SuppressWarnings("GuardedBy")
   private int getBatchesSize() {
   batches.size();
   }`



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -342,12 +419,15 @@ private <ResponseT> ResponseT issueRequest(QueuedRequest 
request, ParseFn<Respon
         }
       } catch (IOException e) {
         LOG.error("Parsing GetData response failed: ", e);
+        try {
+          BackOffUtils.next(Sleeper.DEFAULT, backoff);
+        } catch (IOException | InterruptedException ie) {
+          Thread.currentThread().interrupt();

Review Comment:
   why interrupt on IOException? 
   
   I'm also not sure why `BackOffUtils.next` has a throws IOException, it 
doesn't look like any implementation throws.



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStream.java:
##########
@@ -57,7 +60,9 @@ final class GrpcCommitWorkStream
 
   private static final long HEARTBEAT_REQUEST_ID = Long.MAX_VALUE;
 
-  private final ConcurrentMap<Long, PendingRequest> pending;
+  private final ConcurrentMap<Long, KV<CommitWorkPhysicalStreamHandler, 
PendingRequest>> pending =

Review Comment:
   It looks like we move all pending requests to the new steam on onNewStream, 
it is not clear to me if there'll be multiple CommitWorkPhysicalStreamHandlers 
be in use at the same time. Will it change in the following PRs?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to