arunpandianp commented on code in PR #35523:
URL: https://github.com/apache/beam/pull/35523#discussion_r2259810839


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java:
##########
@@ -467,22 +504,61 @@ public void onCompleted() {
     }
   }
 
-  @SuppressWarnings("nullness")
-  private void clearPhysicalStreamForDebug() {
-    currentPhysicalStreamForDebug.set(null);
+  @SuppressWarnings("ReferenceEquality")
+  private void onHalfClosePhysicalStreamTimeout(PhysicalStreamHandler handler) 
{
+    synchronized (this) {
+      if (currentPhysicalStream != handler || clientClosed || isShutdown) {
+        return;
+      }
+      handler.streamDebugMetrics.recordHalfClose();
+      closingPhysicalStreams.add(handler);
+      clearCurrentPhysicalStream(false);
+      try {
+        requestObserver.onCompleted();
+      } catch (Exception e) {
+        logger.debug(
+            "Exception while half-closing handler, onPhysicalStreamCompletion 
will for the stream",

Review Comment:
   ```suggestion
               "Exception while half-closing handler, 
onPhysicalStreamCompletion for the stream",
   ```



##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java:
##########
@@ -402,7 +419,12 @@ protected synchronized void shutdownInternal() {
 
   @Override
   public void appendSpecificHtml(PrintWriter writer) {
-    writer.format("GetDataStream: %d queued batches", 
batchesDebugSizeSupplier.get());
+    int batches = batchesDebugSizeSupplier.get();
+    if (batches > 0) {
+      writer.format("GetDataStream: %d queued batches ", batches);
+    } else {
+      writer.append("GetDataStream: no queued ");

Review Comment:
   ```suggestion
         writer.append("GetDataStream: no queued batches");
   ```



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##########
@@ -459,6 +463,727 @@ public void testSend_notCalledAfterShutdown_Multichunk()
     assertThat(streamInfo.requests).isEmpty();
   }
 
+  private Windmill.WorkItemCommitRequest createTestCommit(int id) {
+    return Windmill.WorkItemCommitRequest.newBuilder()
+        .setKey(ByteString.EMPTY)
+        .setShardingKey(id)
+        .setWorkToken(id * 100L)
+        .setCacheToken(id * 1000L)
+        .build();
+  }
+
+  @Test
+  public void testCommitWorkItem_multiplePhysicalStreams() throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo = 
waitForConnectionAndConsumeHeader();
+
+    // Send a request where the response is captured in a future.
+    Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest, 
commitStatusFuture::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
+    assertThat(request.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest).isEqualTo(workItemCommitRequest);
+
+    // Trigger a new stream to be created due to handover.
+    assertTrue(triggeredExecutor.unblockNextFuture());

Review Comment:
   can we add a comment saying this triggers the halfCloseFuture in 
AbstractWindmillStream? Took me a bit to figure out how the new stream is 
getting created.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java:
##########
@@ -459,6 +463,727 @@ public void testSend_notCalledAfterShutdown_Multichunk()
     assertThat(streamInfo.requests).isEmpty();
   }
 
+  private Windmill.WorkItemCommitRequest createTestCommit(int id) {
+    return Windmill.WorkItemCommitRequest.newBuilder()
+        .setKey(ByteString.EMPTY)
+        .setShardingKey(id)
+        .setWorkToken(id * 100L)
+        .setCacheToken(id * 1000L)
+        .build();
+  }
+
+  @Test
+  public void testCommitWorkItem_multiplePhysicalStreams() throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo = 
waitForConnectionAndConsumeHeader();
+
+    // Send a request where the response is captured in a future.
+    Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest, 
commitStatusFuture::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
+    assertThat(request.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest).isEqualTo(workItemCommitRequest);
+
+    // Trigger a new stream to be created due to handover.
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    fakeService.expectNoMoreStreams();
+
+    // Previous stream client should be half-closed.
+    assertNull(streamInfo.onDone.get());
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    streamInfo2.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build());
+
+    streamInfo.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build());
+    assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK);
+    assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Complete server-side half-close of first stream. No new
+    // stream should be created since the current stream is active.
+    streamInfo.responseObserver.onCompleted();
+
+    // Close the stream, the open stream should be client half-closed
+    // but logical remains not terminated.
+    commitWorkStream.halfClose();
+    assertNull(streamInfo2.onDone.get());
+    assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS));
+
+    // Complete half-closing from the server and verify shutdown completes.
+    streamInfo2.responseObserver.onCompleted();
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testCommitWorkItem_multiplePhysicalStreams_OldStreamFails() 
throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo = 
waitForConnectionAndConsumeHeader();
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest, 
commitStatusFuture::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
+    assertThat(request.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest).isEqualTo(workItemCommitRequest);
+
+    // A new stream should be created due to handover.
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    fakeService.expectNoMoreStreams();
+
+    // Previous stream client should be half-closed.
+    assertNull(streamInfo.onDone.get());
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    streamInfo2.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build());
+    assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Complete first stream with an error. No new
+    // stream should be created since the current stream is active. The 
request should have an
+    // error and the request should be retried on the new stream.
+    streamInfo.responseObserver.onError(new RuntimeException("test error"));
+    Windmill.StreamingCommitWorkRequest request3 = streamInfo2.requests.take();
+    assertThat(request3.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest3 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request3.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest3).isEqualTo(workItemCommitRequest);
+
+    // Close the stream, the open stream should be client half-closed
+    // but logical remains not terminated.
+    commitWorkStream.halfClose();
+    assertNull(streamInfo2.onDone.get());
+    assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS));
+
+    streamInfo2.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build());
+    assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Complete half-closing from the server and verify shutdown completes.
+    streamInfo2.responseObserver.onCompleted();
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void 
testCommitWorkItem_multiplePhysicalStreams_newStreamFailsWhileEmpty()
+      throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo = 
waitForConnectionAndConsumeHeader();
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest, 
commitStatusFuture::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
+    assertThat(request.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest).isEqualTo(workItemCommitRequest);
+
+    // A new stream should be created due to handover.
+    assertTrue(triggeredExecutor.unblockNextFuture());
+
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo.onDone.get());
+
+    // Before stream 1 is finished simulate stream 2 failing.
+    streamInfo2.responseObserver.onError(new IOException("stream 2 failed"));
+    // A new stream should be created and handle new requests.
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo.onDone.get());
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo3.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    streamInfo3.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build());
+
+    streamInfo.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build());
+    assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK);
+    assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Close the stream.
+    commitWorkStream.halfClose();
+    assertNull(streamInfo.onDone.get());
+    fakeService.expectNoMoreStreams();
+    streamInfo.responseObserver.onCompleted();
+    streamInfo3.responseObserver.onCompleted();
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void 
testCommitWorkItem_multiplePhysicalStreams_newStreamFailsWithRequests()
+      throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo = 
waitForConnectionAndConsumeHeader();
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest, 
commitStatusFuture::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take();
+    assertThat(request.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest).isEqualTo(workItemCommitRequest);
+
+    // A new stream should be created due to handover.
+    assertTrue(triggeredExecutor.unblockNextFuture());
+
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo.onDone.get());
+
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    // Before stream 1 is finished simulate stream 2 failing.
+    streamInfo2.responseObserver.onError(new IOException("stream 2 failed"));
+    // A new stream should be created and receive the pending requests from 
stream2 but not the
+    // request from stream1.
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo.onDone.get());
+    Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take();
+    assertThat(request3.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest3 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request3.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest3).isEqualTo(workItemCommitRequest2);
+
+    streamInfo3.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build());
+
+    streamInfo.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build());
+    assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK);
+    assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Close the stream.
+    commitWorkStream.halfClose();
+    assertNull(streamInfo.onDone.get());
+    fakeService.expectNoMoreStreams();
+    streamInfo.responseObserver.onCompleted();
+    streamInfo3.responseObserver.onCompleted();
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testCommitWorkItem_multiplePhysicalStreams_multipleHandovers() 
throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = 
waitForConnectionAndConsumeHeader();
+
+    // Commit request 1 on stream 1
+    Windmill.WorkItemCommitRequest workItemCommitRequest1 = 
createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest1, 
commitStatusFuture1::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take();
+    assertThat(request1.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest1 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request1.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1);
+
+    // Trigger handover 1
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo1.onDone.get());
+
+    // Commit request 2 on stream 2
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    // Trigger handover 2 before streamInfo2 completes
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo2.onDone.get());
+
+    // Commit request 3 on stream 3
+    Windmill.WorkItemCommitRequest workItemCommitRequest3 = 
createTestCommit(3);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture3 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest3, 
commitStatusFuture3::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take();
+    assertThat(request3.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest3 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request3.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest3).isEqualTo(workItemCommitRequest3);
+
+    // Respond to all requests
+    streamInfo1.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build());
+    streamInfo2.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build());
+    streamInfo3.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(3).build());
+
+    assertThat(commitStatusFuture1.get()).isEqualTo(Windmill.CommitStatus.OK);
+    assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK);
+    assertThat(commitStatusFuture3.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Close the stream
+    commitWorkStream.halfClose();
+    assertNull(streamInfo3.onDone.get());
+
+    // Verify no more streams
+    fakeService.expectNoMoreStreams();
+    streamInfo1.responseObserver.onCompleted();
+    streamInfo2.responseObserver.onCompleted();
+    streamInfo3.responseObserver.onCompleted();
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void 
testCommitWorkItem_multiplePhysicalStreams_OldStreamFailsWhileNewStreamInBackoff()
+      throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = 
waitForConnectionAndConsumeHeader();
+
+    // Commit request 1 on stream 1
+    Windmill.WorkItemCommitRequest workItemCommitRequest1 = 
createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest1, 
commitStatusFuture1::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take();
+    assertThat(request1.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest1 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request1.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1);
+
+    // Trigger handover but fail new connections
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    fakeService.failConnectionsAndWait(1);
+    assertNull(streamInfo1.onDone.get());
+
+    // Fail first stream
+    streamInfo1.responseObserver.onError(new RuntimeException("test error"));
+
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    fakeService.expectNoMoreStreams();
+
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest1);
+
+    // Respond to the request
+    streamInfo2.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build());
+    assertThat(commitStatusFuture1.get()).isEqualTo(Windmill.CommitStatus.OK);
+
+    // Close the stream
+    commitWorkStream.halfClose();
+    assertNull(streamInfo2.onDone.get());
+
+    streamInfo2.responseObserver.onCompleted();
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void 
testCommitWorkItem_multiplePhysicalStreams_multipleHandovers_shutdown()
+      throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = 
waitForConnectionAndConsumeHeader();
+
+    // Commit request 1 on stream 1
+    Windmill.WorkItemCommitRequest workItemCommitRequest1 = 
createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest1, 
commitStatusFuture1::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take();
+    assertThat(request1.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest1 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request1.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1);
+
+    // Trigger handover 1
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo1.onDone.get());
+
+    // Commit request 2 on stream 2
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    // Trigger handover 2 before streamInfo2 completes
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo2.onDone.get());
+
+    // Commit request 3 on stream 3
+    Windmill.WorkItemCommitRequest workItemCommitRequest3 = 
createTestCommit(3);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture3 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest3, 
commitStatusFuture3::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take();
+    assertThat(request3.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest3 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request3.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest3).isEqualTo(workItemCommitRequest3);
+
+    // Shutdown while there are active streams and verify it isn't completed 
until all the streams
+    // are done.
+    fakeService.expectNoMoreStreams();
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.SECONDS));
+    commitWorkStream.shutdown();
+    assertThat(commitStatusFuture1.isDone()).isTrue();
+    assertThat(commitStatusFuture2.isDone()).isTrue();
+    assertThat(commitStatusFuture3.isDone()).isTrue();
+    assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS));
+
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS));
+    streamInfo3.responseObserver.onCompleted();
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS));
+    streamInfo1.responseObserver.onCompleted();
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS));
+    streamInfo2.responseObserver.onError(new RuntimeException("test"));
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void 
testCommitWorkItem_multiplePhysicalStreams_multipleHandovers_halfClose()
+      throws Exception {
+    TriggeredScheduledExecutorService triggeredExecutor = new 
TriggeredScheduledExecutorService();
+    GrpcCommitWorkStream commitWorkStream =
+        (GrpcCommitWorkStream)
+            GrpcWindmillStreamFactory.of(TEST_JOB_HEADER)
+                
.setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60))
+                .setScheduledExecutorService(triggeredExecutor)
+                .build()
+                .createDirectCommitWorkStream(
+                    WindmillConnection.builder()
+                        .setStubSupplier(
+                            () -> 
CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel))
+                        .build());
+    commitWorkStream.start();
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = 
waitForConnectionAndConsumeHeader();
+
+    // Commit request 1 on stream 1
+    Windmill.WorkItemCommitRequest workItemCommitRequest1 = 
createTestCommit(1);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest1, 
commitStatusFuture1::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take();
+    assertThat(request1.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest1 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request1.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1);
+
+    // Trigger handover 1
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo1.onDone.get());
+
+    // Commit request 2 on stream 2
+    Windmill.WorkItemCommitRequest workItemCommitRequest2 = 
createTestCommit(2);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest2, 
commitStatusFuture2::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take();
+    assertThat(request2.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest2 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request2.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2);
+
+    // Trigger handover 2 before streamInfo2 completes
+    assertTrue(triggeredExecutor.unblockNextFuture());
+    FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = 
waitForConnectionAndConsumeHeader();
+    assertNull(streamInfo2.onDone.get());
+
+    // Commit request 3 on stream 3
+    Windmill.WorkItemCommitRequest workItemCommitRequest3 = 
createTestCommit(3);
+    CompletableFuture<Windmill.CommitStatus> commitStatusFuture3 = new 
CompletableFuture<>();
+    try (WindmillStream.CommitWorkStream.RequestBatcher batcher = 
commitWorkStream.batcher()) {
+      assertTrue(
+          batcher.commitWorkItem(
+              COMPUTATION_ID, workItemCommitRequest3, 
commitStatusFuture3::complete));
+    }
+
+    Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take();
+    assertThat(request3.getCommitChunkList()).hasSize(1);
+    Windmill.WorkItemCommitRequest parsedRequest3 =
+        Windmill.WorkItemCommitRequest.parseFrom(
+            request3.getCommitChunk(0).getSerializedWorkItemCommit());
+    assertThat(parsedRequest3).isEqualTo(workItemCommitRequest3);
+
+    // Shutdown while there are active streams and verify it isn't completed 
until all the streams
+    // are done.
+    fakeService.expectNoMoreStreams();
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.SECONDS));
+    commitWorkStream.halfClose();
+
+    assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS));
+    assertThat(streamInfo3.onDone.get()).isNull();
+
+    assertThat(commitStatusFuture1.isDone()).isFalse();
+    assertThat(commitStatusFuture2.isDone()).isFalse();
+    assertThat(commitStatusFuture3.isDone()).isFalse();
+
+    streamInfo3.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder().addRequestId(3).build());
+    streamInfo3.responseObserver.onCompleted();
+    assertThat(commitStatusFuture3.get()).isEqualTo(Windmill.CommitStatus.OK);
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS));
+
+    streamInfo1.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder()
+            .addRequestId(1)
+            .addStatus(Windmill.CommitStatus.ABORTED)
+            .build());
+    streamInfo1.responseObserver.onCompleted();
+    
assertThat(commitStatusFuture1.get()).isEqualTo(Windmill.CommitStatus.ABORTED);
+    assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS));
+
+    streamInfo2.responseObserver.onNext(
+        Windmill.StreamingCommitResponse.newBuilder()
+            .addRequestId(2)
+            .addStatus(Windmill.CommitStatus.ALREADY_IN_COMMIT)
+            .build());
+    streamInfo2.responseObserver.onCompleted();
+    
assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.ALREADY_IN_COMMIT);
+
+    assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS));
+  }
+
+  // XXX add handover tests needed such as:

Review Comment:
   can be removed now?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to