XComp commented on code in PR #22987:
URL: https://github.com/apache/flink/pull/22987#discussion_r1262773979


##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -501,6 +501,22 @@ private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(
                     }
                 });
 
+        // [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+        // mean the executor service Netty is using has shut down, in which 
case the above listener
+        // to complete channelFuture will never run
+        if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {

Review Comment:
   Should we add a comment why we're not handling the success case? 
Essentially, we're trying to work around a bug in the netty code, aren't we?
   ```
       /**
        * Adds the specified listener to this future.  The
        * specified listener is notified when this future is
        * {@linkplain #isDone() done}.  If this future is already
        * completed, the specified listener is notified immediately.
        */
   ```
   The JavaDoc of `Future.addListener` states that the listener would be 
informed if the future is already completed (which includes the successful and 
the exceptional case). But that doesn't match the implementation, apparently.
   
   Right now, we're missing the `isCancelled` and `isSuccess` case handling. 
It's not clear whether these cases can actually happen. But since we're (at 
least) covering the success case in the listener implementation above, wouldn't 
it be reasonable to cover it here as well?



##########
flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java:
##########
@@ -207,6 +209,42 @@ public void testRestClientClosedHandling() throws 
Exception {
         }
     }
 
+    /**
+     * Tests that the futures returned by {@link RestClient} fail immediately 
if the client is
+     * already closed.
+     *
+     * <p>See FLINK-32583
+     */
+    @Test
+    public void testCloseClientBeforeRequest() throws Exception {
+        // Note that the executor passed to the RestClient constructor is not 
the same as the
+        // executor used by Netty
+        try (final RestClient restClient =
+                new RestClient(new Configuration(), 
Executors.directExecutor())) {
+            // Intentionally close the client (and thus also the executor used 
by Netty)
+            restClient.close();
+
+            CompletableFuture<?> future =
+                    restClient.sendRequest(
+                            unroutableIp,
+                            80,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance());
+
+            // Call get() on the future with a timeout of 0 so we can test 
that the exception thrown
+            // is not a TimeoutException, which is what would be thrown if 
restClient were not
+            // already closed
+            final ThrowingRunnable getFuture = () -> future.get(0, 
TimeUnit.SECONDS);
+
+            final ExecutionException executionException =
+                    assertThrows(ExecutionException.class, getFuture);
+            final Throwable throwable = 
ExceptionUtils.stripExecutionException(executionException);
+            assertThat(throwable, instanceOf(IOException.class));
+            assertThat(throwable.getMessage(), containsString("RestClient is 
closed"));

Review Comment:
   ```suggestion
               final Throwable cause = assertThrows(ExecutionException.class, 
getFuture).getCause();
               assertThat(cause, instanceOf(IOException.class));
               assertThat(cause.getMessage(), containsString("RestClient is 
closed"));
   ```
   nit: What about extracting the cause right away and doing the assertions on 
the cause? That makes the test more restrictive. We shouldn't expect a 
multiple-layer stacktrace here, should we?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java:
##########
@@ -501,6 +501,22 @@ private <P extends ResponseBody> CompletableFuture<P> 
submitRequest(
                     }
                 });
 
+        // [FLINK-32583] If connectFuture failed instantly but channelFuture 
is unresolved, it may
+        // mean the executor service Netty is using has shut down, in which 
case the above listener
+        // to complete channelFuture will never run
+        if (connectFuture.isDone() && !connectFuture.isSuccess() && 
!channelFuture.isDone()) {
+            final String message;
+            if (!isRunning.get()) {

Review Comment:
   nit: Could we invert the if condition? ...just for readability purposes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to