This is an automated email from the ASF dual-hosted git repository.

MartijnVisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 676365b5ee7 [FLINK-39858][rest] Fail orphaned in-flight request 
futures on RestClient close (#28315)
676365b5ee7 is described below

commit 676365b5ee7052ddca7c98737f8c9cb49584cc32
Author: Martijn Visser <[email protected]>
AuthorDate: Fri Jun 5 19:03:54 2026 +0200

    [FLINK-39858][rest] Fail orphaned in-flight request futures on RestClient 
close (#28315)
    
    * [FLINK-39858][rest] Fail orphaned in-flight request futures on RestClient 
close
    
    RestClient tracked in-flight requests only via responseChannelFutures, whose
    entries the connect listener removes once connect completes. A close() 
racing
    with a request already past the connect phase could leave its response 
future
    uncompleted, hanging the caller. Track each terminal response future for its
    whole lifetime and fail those on close.
    
    Generated-by: Claude Code (Claude Opus 4.8)
---
 .../org/apache/flink/runtime/rest/RestClient.java  | 110 ++++++++-----
 .../apache/flink/runtime/rest/RestClientTest.java  | 179 +++++++++++++++++++++
 2 files changed, 253 insertions(+), 36 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index bbe2d0cf3df..3de626876c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -139,13 +139,21 @@ public class RestClient implements AutoCloseableAsync {
 
     public static final String VERSION_PLACEHOLDER = "{{VERSION}}";
 
+    @VisibleForTesting
+    static final String CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE =
+            "RestClient closed before request completed";
+
     private final String urlPrefix;
 
-    // Used to track unresolved request futures in case they need to be 
resolved when the client is
-    // closed
     private final Collection<CompletableFuture<Channel>> 
responseChannelFutures =
             ConcurrentHashMap.newKeySet();
 
+    // Unlike responseChannelFutures, which only covers the connect phase, 
this tracks the terminal
+    // response future for its whole lifetime so that close() can fail a 
request that is already
+    // in-flight (its response has not yet arrived).
+    private final Collection<CompletableFuture<?>> pendingRequestFutures =
+            ConcurrentHashMap.newKeySet();
+
     private final List<OutboundChannelHandlerFactory> 
outboundChannelHandlerFactories;
     private final boolean useInternalEventLoopGroup;
 
@@ -316,6 +324,11 @@ public class RestClient implements AutoCloseableAsync {
         return responseChannelFutures;
     }
 
+    @VisibleForTesting
+    Collection<CompletableFuture<?>> getPendingRequestFutures() {
+        return pendingRequestFutures;
+    }
+
     @VisibleForTesting
     List<OutboundChannelHandlerFactory> getOutboundChannelHandlerFactories() {
         return outboundChannelHandlerFactories;
@@ -369,8 +382,14 @@ public class RestClient implements AutoCloseableAsync {
                 future ->
                         future.completeExceptionally(
                                 new IllegalStateException(
-                                        "RestClient closed before request 
completed")));
+                                        
CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE)));
         responseChannelFutures.clear();
+        pendingRequestFutures.forEach(
+                future ->
+                        future.completeExceptionally(
+                                new IllegalStateException(
+                                        
CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE)));
+        pendingRequestFutures.clear();
     }
 
     public <
@@ -618,40 +637,59 @@ public class RestClient implements AutoCloseableAsync {
                     }
                 });
 
-        return channelFuture
-                .thenComposeAsync(
-                        channel -> {
-                            ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-
-                            CompletableFuture<JsonResponse> future;
-                            boolean success = false;
-
-                            try {
-                                if (handler == null) {
-                                    throw new IOException(
-                                            "Netty pipeline was not properly 
initialized.");
-                                } else {
-                                    httpRequest.writeTo(channel);
-                                    future = handler.getJsonFuture();
-                                    success = true;
-                                }
-                            } catch (IOException e) {
-                                future =
-                                        FutureUtils.completedExceptionally(
-                                                new ConnectionException(
-                                                        "Could not write 
request.", e));
-                            } finally {
-                                if (!success) {
-                                    channel.close();
-                                }
-                            }
+        final CompletableFuture<P> responseFuture =
+                channelFuture
+                        .thenComposeAsync(
+                                channel -> {
+                                    ClientHandler handler =
+                                            
channel.pipeline().get(ClientHandler.class);
+
+                                    CompletableFuture<JsonResponse> future;
+                                    boolean success = false;
+
+                                    try {
+                                        if (handler == null) {
+                                            throw new IOException(
+                                                    "Netty pipeline was not 
properly initialized.");
+                                        } else {
+                                            httpRequest.writeTo(channel);
+                                            future = handler.getJsonFuture();
+                                            success = true;
+                                        }
+                                    } catch (IOException e) {
+                                        future =
+                                                
FutureUtils.completedExceptionally(
+                                                        new 
ConnectionException(
+                                                                "Could not 
write request.", e));
+                                    } finally {
+                                        if (!success) {
+                                            channel.close();
+                                        }
+                                    }
+
+                                    return future;
+                                },
+                                executor)
+                        .thenComposeAsync(
+                                (JsonResponse rawResponse) ->
+                                        parseResponse(rawResponse, 
responseType),
+                                executor);
+
+        // Register for the whole request lifetime; deregister on completion 
to avoid leaking.
+        pendingRequestFutures.add(responseFuture);
+        responseFuture.whenComplete(
+                (ignored, throwable) -> 
pendingRequestFutures.remove(responseFuture));
+
+        // Re-check after registering to cover a close() that ran between the 
isRunning check at the
+        // top of this method and the registration above (such a close could 
not have seen this
+        // future). Only fail it if still registered, so we do not 
double-complete; completion is
+        // idempotent regardless.
+        if (!isRunning.get() && pendingRequestFutures.remove(responseFuture)) {
+            responseFuture.completeExceptionally(
+                    new 
IllegalStateException(CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE));
+        }
 
-                            return future;
-                        },
-                        executor)
-                .thenComposeAsync(
-                        (JsonResponse rawResponse) -> 
parseResponse(rawResponse, responseType),
-                        executor);
+        return responseFuture;
     }
 
     private static <P extends ResponseBody> CompletableFuture<P> parseResponse(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index bbad7c7df1c..4630011b2ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.core.testutils.FlinkAssertions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -58,8 +59,11 @@ import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -360,6 +364,181 @@ class RestClientTest {
         }
     }
 
+    /**
+     * Verifies that {@code close()} fails a request that is in-flight past 
the connect phase, and
+     * that it does so specifically through the {@code pendingRequestFutures} 
mechanism rather than
+     * through {@code ClientHandler#channelInactive}.
+     *
+     * <p>The terminal response future is only wired to the {@code 
ClientHandler}'s {@code
+     * jsonFuture} once the first response-composition stage runs on the 
client's executor. This
+     * test installs a <em>deferring</em> executor that captures those stages 
without ever running
+     * them, so:
+     *
+     * <ul>
+     *   <li>the connect phase still completes (the connect listener runs on 
the Netty event loop
+     *       and drains {@code responseChannelFutures}), leaving the request 
in-flight; but
+     *   <li>the {@code channelInactive} completion of {@code jsonFuture} 
triggered by {@code
+     *       close()} can never reach the terminal future, because the stage 
that would subscribe to
+     *       {@code jsonFuture} never runs.
+     * </ul>
+     *
+     * <p>The only mechanism left that can complete the terminal future is 
{@code
+     * pendingRequestFutures}. The test therefore asserts the 
<em>identity</em> of the failure
+     * ({@link IllegalStateException} with {@code 
CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE}), not
+     * merely that the future failed: a {@code ConnectionClosedException} here 
would indicate the
+     * {@code channelInactive} path completed it instead. Without the {@code 
pendingRequestFutures}
+     * tracking the terminal future would never complete and {@code 
failsWithin} would elapse.
+     */
+    @Test
+    void testCloseFailsInFlightRequestFutureViaPendingRequestFutures() throws 
Exception {
+        final Configuration config = new Configuration();
+
+        // Captures the response-composition stages without ever executing 
them, so the terminal
+        // future is never subscribed to the handler's jsonFuture.
+        final Queue<Runnable> deferredStages = new ConcurrentLinkedQueue<>();
+        final Executor deferringExecutor = deferredStages::add;
+
+        Socket connectionSocket = null;
+        try (final ServerSocket serverSocket = new ServerSocket(0);
+                final RestClient restClient = new RestClient(config, 
deferringExecutor)) {
+
+            final String targetAddress = "localhost";
+            final int targetPort = serverSocket.getLocalPort();
+
+            // A server that accepts the connection but never sends a 
response, so the request stays
+            // in its in-flight (response) phase until the client is closed.
+            final CompletableFuture<Socket> acceptedSocket =
+                    CompletableFuture.supplyAsync(
+                            CheckedSupplier.unchecked(
+                                    () -> 
NetUtils.acceptWithoutTimeout(serverSocket)));
+
+            assertThat(restClient.getResponseChannelFutures()).isEmpty();
+            assertThat(restClient.getPendingRequestFutures()).isEmpty();
+
+            final CompletableFuture<EmptyResponseBody> responseFuture =
+                    restClient.sendRequest(
+                            targetAddress,
+                            targetPort,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance(),
+                            Collections.emptyList());
+
+            // Once the server accepts, the connect listener has run and 
removed the connect-phase
+            // future from responseChannelFutures: the request is now 
in-flight. The first
+            // composition stage has been submitted to the deferring executor 
(captured, not run).
+            connectionSocket = acceptedSocket.get(TIMEOUT, TimeUnit.SECONDS);
+
+            CommonTestUtils.waitUtil(
+                    () -> restClient.getResponseChannelFutures().isEmpty(),
+                    Duration.ofSeconds(TIMEOUT),
+                    "responseChannelFutures was not drained after the connect 
phase completed");
+
+            restClient.close();
+
+            // The terminal future can only have been completed by the 
pendingRequestFutures path:
+            // its composition stages never ran, so the channelInactive 
completion of jsonFuture
+            // could not reach it. Asserting the failure identity proves the 
change is exercised.
+            assertThat(responseFuture)
+                    .as("close() must fail the in-flight future via 
pendingRequestFutures")
+                    .failsWithin(Duration.ofSeconds(TIMEOUT))
+                    .withThrowableOfType(ExecutionException.class)
+                    .withCauseInstanceOf(IllegalStateException.class)
+                    
.withMessageContaining(RestClient.CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE);
+
+            // After close, the tracking collection must be drained.
+            CommonTestUtils.waitUtil(
+                    () -> restClient.getPendingRequestFutures().isEmpty(),
+                    Duration.ofSeconds(TIMEOUT),
+                    "pendingRequestFutures was not drained after close");
+
+            // Sanity check: exactly the first composition stage was deferred 
(never executed),
+            // which is what isolates the pendingRequestFutures path from 
channelInactive. The
+            // second stage cannot be submitted until the first runs, which it 
never does.
+            assertThat(deferredStages).hasSize(1);
+        } finally {
+            if (connectionSocket != null) {
+                connectionSocket.close();
+            }
+        }
+    }
+
+    /**
+     * End-to-end companion to {@link 
#testCloseFailsInFlightRequestFutureViaPendingRequestFutures}:
+     * with the real executor running the response-composition stages, 
verifies that {@code close()}
+     * fails an in-flight request future. This exercises the normal production 
path (including the
+     * handler's {@code channelInactive} completion) rather than isolating a 
single mechanism. The
+     * request is driven against a local server that accepts the connection 
but never replies; all
+     * waits are bounded so a regression fails rather than hangs.
+     */
+    @Test
+    void testCloseFailsInFlightRequestFutureAfterConnectPhase() throws 
Exception {
+        final Configuration config = new Configuration();
+
+        Socket connectionSocket = null;
+        try (final ServerSocket serverSocket = new ServerSocket(0);
+                final RestClient restClient =
+                        new RestClient(config, 
EXECUTOR_EXTENSION.getExecutor())) {
+
+            final String targetAddress = "localhost";
+            final int targetPort = serverSocket.getLocalPort();
+
+            // A server that accepts the connection but never sends a 
response, so the request stays
+            // in its in-flight (response) phase until the client is closed.
+            final CompletableFuture<Socket> acceptedSocket =
+                    CompletableFuture.supplyAsync(
+                            CheckedSupplier.unchecked(
+                                    () -> 
NetUtils.acceptWithoutTimeout(serverSocket)));
+
+            assertThat(restClient.getResponseChannelFutures()).isEmpty();
+            assertThat(restClient.getPendingRequestFutures()).isEmpty();
+
+            final CompletableFuture<EmptyResponseBody> responseFuture =
+                    restClient.sendRequest(
+                            targetAddress,
+                            targetPort,
+                            new TestMessageHeaders(),
+                            EmptyMessageParameters.getInstance(),
+                            EmptyRequestBody.getInstance(),
+                            Collections.emptyList());
+
+            // Once the server accepts, the connect listener has run and 
removed the connect-phase
+            // future from responseChannelFutures: the request is now 
in-flight.
+            connectionSocket = acceptedSocket.get(TIMEOUT, TimeUnit.SECONDS);
+
+            CommonTestUtils.waitUtil(
+                    () -> restClient.getResponseChannelFutures().isEmpty(),
+                    Duration.ofSeconds(TIMEOUT),
+                    "responseChannelFutures was not drained after the connect 
phase completed");
+
+            // Without the fix the in-flight future is tracked by nothing, so 
this bounded wait
+            // elapses and the assertion fails (it does not hang).
+            CommonTestUtils.waitUtil(
+                    () -> restClient.getPendingRequestFutures().size() == 1,
+                    Duration.ofSeconds(TIMEOUT),
+                    "Terminal response future must remain tracked by 
pendingRequestFutures while "
+                            + "the request is in-flight so that close can fail 
it");
+
+            restClient.close();
+
+            // The terminal future must be failed by close (bounded await, 
never hangs).
+            assertThat(responseFuture)
+                    .as("Closing the client must fail the in-flight request 
future")
+                    .failsWithin(Duration.ofSeconds(TIMEOUT))
+                    .withThrowableOfType(ExecutionException.class);
+
+            // After close, the tracking collection must be drained.
+            CommonTestUtils.waitUtil(
+                    () -> restClient.getPendingRequestFutures().isEmpty(),
+                    Duration.ofSeconds(TIMEOUT),
+                    "pendingRequestFutures was not drained after close");
+        } finally {
+            if (connectionSocket != null) {
+                connectionSocket.close();
+            }
+        }
+    }
+
     @Test
     void testResponseChannelFuturesResolvedExceptionallyOnClose() throws 
Exception {
         try (final RestClient restClient =

Reply via email to