XComp commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1274946566


##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {
+        try (final RestClient restClient =
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
+            restClient.close(); // Intentionally close the client prior to the 
request
+
+            CompletableFuture<?> future =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Call get() on the future with a timeout of 0s so we can test 
that the exception
+            // thrown is not a TimeoutException, which is what would be thrown 
if restClient were
+            // not already closed
+            final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
+
+            final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
+            assertThat(cause, instanceOf(IllegalStateException.class));
+            assertThat(cause.getMessage(), equalTo("RestClient is already 
closed"));
+        }
+    }
+
+    @Test
+    public void testCloseClientWhileProcessingRequest() throws Exception {
+        // Set up a Netty SelectStrategy with latches that allow us to step 
forward through Netty's
+        // request state machine, closing the client at a particular moment
+        final OneShotLatch connectTriggered = new OneShotLatch();
+        final OneShotLatch closeTriggered = new OneShotLatch();
+        final SelectStrategy fallbackSelectStrategy =
+                DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
+        final SelectStrategyFactory selectStrategyFactory =
+                () ->
+                        (selectSupplier, hasTasks) -> {
+                            connectTriggered.trigger();
+                            closeTriggered.awaitQuietly(1, TimeUnit.SECONDS);

Review Comment:
   ```suggestion
                               closeTriggered.awaitQuietly();
   ```
   The community agreed on not using timeouts in tests (see [Flink Coding 
Conventions](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-timeouts-in-junit-tests))
 since they are a source of instabilities. Instead the test would run until the 
overall CI build is cancelled after ~4hrs.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {
+        try (final RestClient restClient =
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
+            restClient.close(); // Intentionally close the client prior to the 
request
+
+            CompletableFuture<?> future =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Call get() on the future with a timeout of 0s so we can test 
that the exception
+            // thrown is not a TimeoutException, which is what would be thrown 
if restClient were
+            // not already closed
+            final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
+
+            final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
+            assertThat(cause, instanceOf(IllegalStateException.class));
+            assertThat(cause.getMessage(), equalTo("RestClient is already 
closed"));
+        }
+    }
+
+    @Test
+    public void testCloseClientWhileProcessingRequest() throws Exception {
+        // Set up a Netty SelectStrategy with latches that allow us to step 
forward through Netty's
+        // request state machine, closing the client at a particular moment
+        final OneShotLatch connectTriggered = new OneShotLatch();
+        final OneShotLatch closeTriggered = new OneShotLatch();
+        final SelectStrategy fallbackSelectStrategy =
+                DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy();
+        final SelectStrategyFactory selectStrategyFactory =
+                () ->
+                        (selectSupplier, hasTasks) -> {
+                            connectTriggered.trigger();
+                            closeTriggered.awaitQuietly(1, TimeUnit.SECONDS);
+
+                            return fallbackSelectStrategy.calculateStrategy(
+                                    selectSupplier, hasTasks);
+                        };
+
+        try (final RestClient restClient =
+                new RestClient(
+                        new Configuration(), Executors.directExecutor(), 
selectStrategyFactory)) {
+            // Check that client's internal collection of pending response 
futures is empty prior to
+            // the request
+            assertThat(restClient.getResponseChannelFutures(), empty());
+
+            final CompletableFuture<?> requestFuture =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Check that client's internal collection of pending response 
futures now has one
+            // entry, presumably due to the call to sendRequest
+            assertThat(restClient.getResponseChannelFutures(), hasSize(1));
+
+            // Wait for Netty to start connecting, then while it's paused in 
the SelectStrategy,
+            // close the client before unpausing Netty
+            connectTriggered.await(1, TimeUnit.SECONDS);

Review Comment:
   same here: No timeout should be used.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -150,11 +159,30 @@ public static RestClient forUrl(Configuration 
configuration, Executor executor,
 
     public RestClient(Configuration configuration, Executor executor)
             throws ConfigurationException {
-        this(configuration, executor, null, -1);
+        this(configuration, executor, null, -1, 
DefaultSelectStrategyFactory.INSTANCE);

Review Comment:
   True, I overlooked that we're doing it your way in for the default port 
`-1`. Feel free to revert it if you want to be more consistent in this class 
:innocent: 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to