XComp commented on code in PR #23296:
URL: https://github.com/apache/flink/pull/23296#discussion_r1308286524


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/collect/CollectSinkOperatorCoordinatorTest.java:
##########
@@ -105,54 +102,147 @@ void testCloseAfterRequestIsReceived() throws Exception {
     }
 
     @Test
-    void testServerFailure() throws Exception {
-        CollectSinkOperatorCoordinator coordinator =
-                new CollectSinkOperatorCoordinator(SOCKET_TIMEOUT_MILLIS);
-        coordinator.start();
-
-        final String versionOfFailedRequest = "version3";
-        final CompletableFuture<CoordinationResponse> failedResponseFuture;
-        try (final TestingSocketServer socketServer =
-                
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
-
-            // a normal response
-            final List<Row> expectedData0 = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
-            final CompletableFuture<CoordinationResponse> responseFuture0 =
-                    coordinator.handleCoordinationRequest(
-                            createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData0);
-            assertResponseWithDefaultMetadataFromServer(responseFuture0, 
expectedData0);
+    void testSuccessfulResponse() throws Exception {
+        try (CollectSinkOperatorCoordinator testInstance = new 
CollectSinkOperatorCoordinator();
+                final TestingSocketServer socketServer =
+                        
TestingSocketServer.createSocketServerAndInitializeCoordinator(
+                                testInstance)) {
+            testInstance.start();
 
-            // a normal response
-            final List<Row> expectedData1 =
-                    Arrays.asList(Row.of(3, "ccc"), Row.of(4, "ddd"), 
Row.of(5, "eee"));
-            final CompletableFuture<CoordinationResponse> responseFuture1 =
-                    coordinator.handleCoordinationRequest(
+            final List<Row> expectedData = Arrays.asList(Row.of(1, "aaa"), 
Row.of(2, "bbb"));
+            final CompletableFuture<CoordinationResponse> responseFuture =
+                    testInstance.handleCoordinationRequest(
                             createRequestForServerGeneratedResponse());
-            socketServer.handleRequest(expectedData1);
-            assertResponseWithDefaultMetadataFromServer(responseFuture1, 
expectedData1);
+            assertThat(responseFuture).isNotDone();
+
+            socketServer.handleRequest(expectedData);
+
+            assertResponseWithDefaultMetadataFromServer(responseFuture, 
expectedData);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheServerSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            final CompletableFuture<CoordinationResponse> responseFuture;
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                
FlinkAssertions.assertThatFuture(socketServer.handleRequestWithoutResponse())
+                        .eventuallySucceeds();
+            }
+            assertEmptyResponseGeneratedFromServer(responseFuture);
+        }
+    }
+
+    @Test
+    void testServerSideClosingTheAcceptingSocket() throws Exception {
+        try (CollectSinkOperatorCoordinator coordinator = new 
CollectSinkOperatorCoordinator()) {
+            coordinator.start();
+
+            try (final TestingSocketServer socketServer =
+                    
TestingSocketServer.createSocketServerAndInitializeCoordinator(coordinator)) {
+                final String version = "version";
+                final CompletableFuture<CoordinationResponse> responseFuture =
+                        coordinator.handleCoordinationRequest(
+                                
createRequestForClientGeneratedResponse(version));
+                assertThat(responseFuture).isNotDone();
+
+                final CompletableFuture<List<Row>> dataFuture = new 
CompletableFuture<>();
+                final CompletableFuture<Void> serverSideProcess =
+                        socketServer.handleRequestAsync(dataFuture);
+                assertThat(serverSideProcess).isNotDone();
+
+                // wait for the connection to be established
+                socketServer.waitForConnectionToBeEstablished();
+                // in order to close the connection from the server's side
+                socketServer.closeAcceptingSocket();
+
+                assertEmptyResponseGeneratedFromClient(responseFuture, 
version);
+
+                assertThat(serverSideProcess).isNotDone();
 
-            // server closes here
-            failedResponseFuture =
+                dataFuture.complete(Collections.singletonList(Row.of(123, 
"abc")));
+
+                FlinkAssertions.assertThatFuture(serverSideProcess)
+                        .eventuallyFailsWith(ExecutionException.class)
+                        .withCauseInstanceOf(SocketException.class);

Review Comment:
   I also removed the lines above (see 
[comment](https://github.com/apache/flink/pull/23296#discussion_r1308284339)). 
Injecting the data is also not necessary in 
`testClosingTheListeningSocketInTheSinkFunction` (aka 
`testServerSideClosingTheAcceptingSocket`)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to