arunpandianp commented on code in PR #35523: URL: https://github.com/apache/beam/pull/35523#discussion_r2259810839
########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/AbstractWindmillStream.java: ########## @@ -467,22 +504,61 @@ public void onCompleted() { } } - @SuppressWarnings("nullness") - private void clearPhysicalStreamForDebug() { - currentPhysicalStreamForDebug.set(null); + @SuppressWarnings("ReferenceEquality") + private void onHalfClosePhysicalStreamTimeout(PhysicalStreamHandler handler) { + synchronized (this) { + if (currentPhysicalStream != handler || clientClosed || isShutdown) { + return; + } + handler.streamDebugMetrics.recordHalfClose(); + closingPhysicalStreams.add(handler); + clearCurrentPhysicalStream(false); + try { + requestObserver.onCompleted(); + } catch (Exception e) { + logger.debug( + "Exception while half-closing handler, onPhysicalStreamCompletion will for the stream", Review Comment: ```suggestion "Exception while half-closing handler, onPhysicalStreamCompletion for the stream", ``` ########## runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcGetDataStream.java: ########## @@ -402,7 +419,12 @@ protected synchronized void shutdownInternal() { @Override public void appendSpecificHtml(PrintWriter writer) { - writer.format("GetDataStream: %d queued batches", batchesDebugSizeSupplier.get()); + int batches = batchesDebugSizeSupplier.get(); + if (batches > 0) { + writer.format("GetDataStream: %d queued batches ", batches); + } else { + writer.append("GetDataStream: no queued "); Review Comment: ```suggestion writer.append("GetDataStream: no queued batches"); ``` ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ########## @@ -459,6 +463,727 @@ public void testSend_notCalledAfterShutdown_Multichunk() assertThat(streamInfo.requests).isEmpty(); } + private Windmill.WorkItemCommitRequest createTestCommit(int id) { + return Windmill.WorkItemCommitRequest.newBuilder() + .setKey(ByteString.EMPTY) + .setShardingKey(id) + .setWorkToken(id * 100L) + .setCacheToken(id * 1000L) + .build(); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams() throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader(); + + // Send a request where the response is captured in a future. + Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest, commitStatusFuture::complete)); + } + + Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take(); + assertThat(request.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest = + Windmill.WorkItemCommitRequest.parseFrom( + request.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest).isEqualTo(workItemCommitRequest); + + // Trigger a new stream to be created due to handover. + assertTrue(triggeredExecutor.unblockNextFuture()); Review Comment: can we add a comment saying this triggers the halfCloseFuture in AbstractWindmillStream? Took me a bit to figure out how the new stream is getting created. ########## runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/windmill/client/grpc/GrpcCommitWorkStreamTest.java: ########## @@ -459,6 +463,727 @@ public void testSend_notCalledAfterShutdown_Multichunk() assertThat(streamInfo.requests).isEmpty(); } + private Windmill.WorkItemCommitRequest createTestCommit(int id) { + return Windmill.WorkItemCommitRequest.newBuilder() + .setKey(ByteString.EMPTY) + .setShardingKey(id) + .setWorkToken(id * 100L) + .setCacheToken(id * 1000L) + .build(); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams() throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader(); + + // Send a request where the response is captured in a future. + Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest, commitStatusFuture::complete)); + } + + Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take(); + assertThat(request.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest = + Windmill.WorkItemCommitRequest.parseFrom( + request.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest).isEqualTo(workItemCommitRequest); + + // Trigger a new stream to be created due to handover. + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + fakeService.expectNoMoreStreams(); + + // Previous stream client should be half-closed. + assertNull(streamInfo.onDone.get()); + + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + streamInfo2.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build()); + + streamInfo.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build()); + assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK); + assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Complete server-side half-close of first stream. No new + // stream should be created since the current stream is active. + streamInfo.responseObserver.onCompleted(); + + // Close the stream, the open stream should be client half-closed + // but logical remains not terminated. + commitWorkStream.halfClose(); + assertNull(streamInfo2.onDone.get()); + assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS)); + + // Complete half-closing from the server and verify shutdown completes. + streamInfo2.responseObserver.onCompleted(); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_OldStreamFails() throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader(); + + Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest, commitStatusFuture::complete)); + } + + Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take(); + assertThat(request.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest = + Windmill.WorkItemCommitRequest.parseFrom( + request.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest).isEqualTo(workItemCommitRequest); + + // A new stream should be created due to handover. + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + fakeService.expectNoMoreStreams(); + + // Previous stream client should be half-closed. + assertNull(streamInfo.onDone.get()); + + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + streamInfo2.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build()); + assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Complete first stream with an error. No new + // stream should be created since the current stream is active. The request should have an + // error and the request should be retried on the new stream. + streamInfo.responseObserver.onError(new RuntimeException("test error")); + Windmill.StreamingCommitWorkRequest request3 = streamInfo2.requests.take(); + assertThat(request3.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest3 = + Windmill.WorkItemCommitRequest.parseFrom( + request3.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest3).isEqualTo(workItemCommitRequest); + + // Close the stream, the open stream should be client half-closed + // but logical remains not terminated. + commitWorkStream.halfClose(); + assertNull(streamInfo2.onDone.get()); + assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS)); + + streamInfo2.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build()); + assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Complete half-closing from the server and verify shutdown completes. + streamInfo2.responseObserver.onCompleted(); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_newStreamFailsWhileEmpty() + throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader(); + + Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest, commitStatusFuture::complete)); + } + + Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take(); + assertThat(request.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest = + Windmill.WorkItemCommitRequest.parseFrom( + request.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest).isEqualTo(workItemCommitRequest); + + // A new stream should be created due to handover. + assertTrue(triggeredExecutor.unblockNextFuture()); + + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo.onDone.get()); + + // Before stream 1 is finished simulate stream 2 failing. + streamInfo2.responseObserver.onError(new IOException("stream 2 failed")); + // A new stream should be created and handle new requests. + FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo.onDone.get()); + + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + Windmill.StreamingCommitWorkRequest request2 = streamInfo3.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + streamInfo3.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build()); + + streamInfo.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build()); + assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK); + assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Close the stream. + commitWorkStream.halfClose(); + assertNull(streamInfo.onDone.get()); + fakeService.expectNoMoreStreams(); + streamInfo.responseObserver.onCompleted(); + streamInfo3.responseObserver.onCompleted(); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_newStreamFailsWithRequests() + throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo = waitForConnectionAndConsumeHeader(); + + Windmill.WorkItemCommitRequest workItemCommitRequest = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest, commitStatusFuture::complete)); + } + + Windmill.StreamingCommitWorkRequest request = streamInfo.requests.take(); + assertThat(request.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest = + Windmill.WorkItemCommitRequest.parseFrom( + request.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest).isEqualTo(workItemCommitRequest); + + // A new stream should be created due to handover. + assertTrue(triggeredExecutor.unblockNextFuture()); + + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo.onDone.get()); + + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + // Before stream 1 is finished simulate stream 2 failing. + streamInfo2.responseObserver.onError(new IOException("stream 2 failed")); + // A new stream should be created and receive the pending requests from stream2 but not the + // request from stream1. + FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo.onDone.get()); + Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take(); + assertThat(request3.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest3 = + Windmill.WorkItemCommitRequest.parseFrom( + request3.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest3).isEqualTo(workItemCommitRequest2); + + streamInfo3.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build()); + + streamInfo.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build()); + assertThat(commitStatusFuture.get()).isEqualTo(Windmill.CommitStatus.OK); + assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Close the stream. + commitWorkStream.halfClose(); + assertNull(streamInfo.onDone.get()); + fakeService.expectNoMoreStreams(); + streamInfo.responseObserver.onCompleted(); + streamInfo3.responseObserver.onCompleted(); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_multipleHandovers() throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = waitForConnectionAndConsumeHeader(); + + // Commit request 1 on stream 1 + Windmill.WorkItemCommitRequest workItemCommitRequest1 = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest1, commitStatusFuture1::complete)); + } + + Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take(); + assertThat(request1.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest1 = + Windmill.WorkItemCommitRequest.parseFrom( + request1.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1); + + // Trigger handover 1 + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo1.onDone.get()); + + // Commit request 2 on stream 2 + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + // Trigger handover 2 before streamInfo2 completes + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo2.onDone.get()); + + // Commit request 3 on stream 3 + Windmill.WorkItemCommitRequest workItemCommitRequest3 = createTestCommit(3); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture3 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest3, commitStatusFuture3::complete)); + } + + Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take(); + assertThat(request3.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest3 = + Windmill.WorkItemCommitRequest.parseFrom( + request3.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest3).isEqualTo(workItemCommitRequest3); + + // Respond to all requests + streamInfo1.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build()); + streamInfo2.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(2).build()); + streamInfo3.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(3).build()); + + assertThat(commitStatusFuture1.get()).isEqualTo(Windmill.CommitStatus.OK); + assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.OK); + assertThat(commitStatusFuture3.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Close the stream + commitWorkStream.halfClose(); + assertNull(streamInfo3.onDone.get()); + + // Verify no more streams + fakeService.expectNoMoreStreams(); + streamInfo1.responseObserver.onCompleted(); + streamInfo2.responseObserver.onCompleted(); + streamInfo3.responseObserver.onCompleted(); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_OldStreamFailsWhileNewStreamInBackoff() + throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = waitForConnectionAndConsumeHeader(); + + // Commit request 1 on stream 1 + Windmill.WorkItemCommitRequest workItemCommitRequest1 = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest1, commitStatusFuture1::complete)); + } + + Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take(); + assertThat(request1.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest1 = + Windmill.WorkItemCommitRequest.parseFrom( + request1.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1); + + // Trigger handover but fail new connections + assertTrue(triggeredExecutor.unblockNextFuture()); + fakeService.failConnectionsAndWait(1); + assertNull(streamInfo1.onDone.get()); + + // Fail first stream + streamInfo1.responseObserver.onError(new RuntimeException("test error")); + + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + fakeService.expectNoMoreStreams(); + + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest1); + + // Respond to the request + streamInfo2.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(1).build()); + assertThat(commitStatusFuture1.get()).isEqualTo(Windmill.CommitStatus.OK); + + // Close the stream + commitWorkStream.halfClose(); + assertNull(streamInfo2.onDone.get()); + + streamInfo2.responseObserver.onCompleted(); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_multipleHandovers_shutdown() + throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = waitForConnectionAndConsumeHeader(); + + // Commit request 1 on stream 1 + Windmill.WorkItemCommitRequest workItemCommitRequest1 = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest1, commitStatusFuture1::complete)); + } + + Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take(); + assertThat(request1.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest1 = + Windmill.WorkItemCommitRequest.parseFrom( + request1.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1); + + // Trigger handover 1 + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo1.onDone.get()); + + // Commit request 2 on stream 2 + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + // Trigger handover 2 before streamInfo2 completes + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo2.onDone.get()); + + // Commit request 3 on stream 3 + Windmill.WorkItemCommitRequest workItemCommitRequest3 = createTestCommit(3); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture3 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest3, commitStatusFuture3::complete)); + } + + Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take(); + assertThat(request3.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest3 = + Windmill.WorkItemCommitRequest.parseFrom( + request3.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest3).isEqualTo(workItemCommitRequest3); + + // Shutdown while there are active streams and verify it isn't completed until all the streams + // are done. + fakeService.expectNoMoreStreams(); + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.SECONDS)); + commitWorkStream.shutdown(); + assertThat(commitStatusFuture1.isDone()).isTrue(); + assertThat(commitStatusFuture2.isDone()).isTrue(); + assertThat(commitStatusFuture3.isDone()).isTrue(); + assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS)); + + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS)); + streamInfo3.responseObserver.onCompleted(); + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS)); + streamInfo1.responseObserver.onCompleted(); + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS)); + streamInfo2.responseObserver.onError(new RuntimeException("test")); + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + @Test + public void testCommitWorkItem_multiplePhysicalStreams_multipleHandovers_halfClose() + throws Exception { + TriggeredScheduledExecutorService triggeredExecutor = new TriggeredScheduledExecutorService(); + GrpcCommitWorkStream commitWorkStream = + (GrpcCommitWorkStream) + GrpcWindmillStreamFactory.of(TEST_JOB_HEADER) + .setDirectStreamingRpcPhysicalStreamHalfCloseAfter(java.time.Duration.ofSeconds(60)) + .setScheduledExecutorService(triggeredExecutor) + .build() + .createDirectCommitWorkStream( + WindmillConnection.builder() + .setStubSupplier( + () -> CloudWindmillServiceV1Alpha1Grpc.newStub(inProcessChannel)) + .build()); + commitWorkStream.start(); + FakeWindmillGrpcService.CommitStreamInfo streamInfo1 = waitForConnectionAndConsumeHeader(); + + // Commit request 1 on stream 1 + Windmill.WorkItemCommitRequest workItemCommitRequest1 = createTestCommit(1); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture1 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest1, commitStatusFuture1::complete)); + } + + Windmill.StreamingCommitWorkRequest request1 = streamInfo1.requests.take(); + assertThat(request1.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest1 = + Windmill.WorkItemCommitRequest.parseFrom( + request1.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest1).isEqualTo(workItemCommitRequest1); + + // Trigger handover 1 + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo2 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo1.onDone.get()); + + // Commit request 2 on stream 2 + Windmill.WorkItemCommitRequest workItemCommitRequest2 = createTestCommit(2); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture2 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest2, commitStatusFuture2::complete)); + } + + Windmill.StreamingCommitWorkRequest request2 = streamInfo2.requests.take(); + assertThat(request2.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest2 = + Windmill.WorkItemCommitRequest.parseFrom( + request2.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest2).isEqualTo(workItemCommitRequest2); + + // Trigger handover 2 before streamInfo2 completes + assertTrue(triggeredExecutor.unblockNextFuture()); + FakeWindmillGrpcService.CommitStreamInfo streamInfo3 = waitForConnectionAndConsumeHeader(); + assertNull(streamInfo2.onDone.get()); + + // Commit request 3 on stream 3 + Windmill.WorkItemCommitRequest workItemCommitRequest3 = createTestCommit(3); + CompletableFuture<Windmill.CommitStatus> commitStatusFuture3 = new CompletableFuture<>(); + try (WindmillStream.CommitWorkStream.RequestBatcher batcher = commitWorkStream.batcher()) { + assertTrue( + batcher.commitWorkItem( + COMPUTATION_ID, workItemCommitRequest3, commitStatusFuture3::complete)); + } + + Windmill.StreamingCommitWorkRequest request3 = streamInfo3.requests.take(); + assertThat(request3.getCommitChunkList()).hasSize(1); + Windmill.WorkItemCommitRequest parsedRequest3 = + Windmill.WorkItemCommitRequest.parseFrom( + request3.getCommitChunk(0).getSerializedWorkItemCommit()); + assertThat(parsedRequest3).isEqualTo(workItemCommitRequest3); + + // Shutdown while there are active streams and verify it isn't completed until all the streams + // are done. + fakeService.expectNoMoreStreams(); + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.SECONDS)); + commitWorkStream.halfClose(); + + assertFalse(commitWorkStream.awaitTermination(10, TimeUnit.MILLISECONDS)); + assertThat(streamInfo3.onDone.get()).isNull(); + + assertThat(commitStatusFuture1.isDone()).isFalse(); + assertThat(commitStatusFuture2.isDone()).isFalse(); + assertThat(commitStatusFuture3.isDone()).isFalse(); + + streamInfo3.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder().addRequestId(3).build()); + streamInfo3.responseObserver.onCompleted(); + assertThat(commitStatusFuture3.get()).isEqualTo(Windmill.CommitStatus.OK); + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS)); + + streamInfo1.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder() + .addRequestId(1) + .addStatus(Windmill.CommitStatus.ABORTED) + .build()); + streamInfo1.responseObserver.onCompleted(); + assertThat(commitStatusFuture1.get()).isEqualTo(Windmill.CommitStatus.ABORTED); + assertFalse(commitWorkStream.awaitTermination(0, TimeUnit.MILLISECONDS)); + + streamInfo2.responseObserver.onNext( + Windmill.StreamingCommitResponse.newBuilder() + .addRequestId(2) + .addStatus(Windmill.CommitStatus.ALREADY_IN_COMMIT) + .build()); + streamInfo2.responseObserver.onCompleted(); + assertThat(commitStatusFuture2.get()).isEqualTo(Windmill.CommitStatus.ALREADY_IN_COMMIT); + + assertTrue(commitWorkStream.awaitTermination(10, TimeUnit.SECONDS)); + } + + // XXX add handover tests needed such as: Review Comment: can be removed now? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org