This is an automated email from the ASF dual-hosted git repository.
MartijnVisser pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 676365b5ee7 [FLINK-39858][rest] Fail orphaned in-flight request
futures on RestClient close (#28315)
676365b5ee7 is described below
commit 676365b5ee7052ddca7c98737f8c9cb49584cc32
Author: Martijn Visser <[email protected]>
AuthorDate: Fri Jun 5 19:03:54 2026 +0200
[FLINK-39858][rest] Fail orphaned in-flight request futures on RestClient
close (#28315)
* [FLINK-39858][rest] Fail orphaned in-flight request futures on RestClient
close
RestClient tracked in-flight requests only via responseChannelFutures, whose
entries the connect listener removes once connect completes. A close()
racing
with a request already past the connect phase could leave its response
future
uncompleted, hanging the caller. Track each terminal response future for its
whole lifetime and fail those on close.
Generated-by: Claude Code (Claude Opus 4.8)
---
.../org/apache/flink/runtime/rest/RestClient.java | 110 ++++++++-----
.../apache/flink/runtime/rest/RestClientTest.java | 179 +++++++++++++++++++++
2 files changed, 253 insertions(+), 36 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index bbe2d0cf3df..3de626876c5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -139,13 +139,21 @@ public class RestClient implements AutoCloseableAsync {
public static final String VERSION_PLACEHOLDER = "{{VERSION}}";
+ @VisibleForTesting
+ static final String CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE =
+ "RestClient closed before request completed";
+
private final String urlPrefix;
- // Used to track unresolved request futures in case they need to be
resolved when the client is
- // closed
private final Collection<CompletableFuture<Channel>>
responseChannelFutures =
ConcurrentHashMap.newKeySet();
+ // Unlike responseChannelFutures, which only covers the connect phase,
this tracks the terminal
+ // response future for its whole lifetime so that close() can fail a
request that is already
+ // in-flight (its response has not yet arrived).
+ private final Collection<CompletableFuture<?>> pendingRequestFutures =
+ ConcurrentHashMap.newKeySet();
+
private final List<OutboundChannelHandlerFactory>
outboundChannelHandlerFactories;
private final boolean useInternalEventLoopGroup;
@@ -316,6 +324,11 @@ public class RestClient implements AutoCloseableAsync {
return responseChannelFutures;
}
+ @VisibleForTesting
+ Collection<CompletableFuture<?>> getPendingRequestFutures() {
+ return pendingRequestFutures;
+ }
+
@VisibleForTesting
List<OutboundChannelHandlerFactory> getOutboundChannelHandlerFactories() {
return outboundChannelHandlerFactories;
@@ -369,8 +382,14 @@ public class RestClient implements AutoCloseableAsync {
future ->
future.completeExceptionally(
new IllegalStateException(
- "RestClient closed before request
completed")));
+
CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE)));
responseChannelFutures.clear();
+ pendingRequestFutures.forEach(
+ future ->
+ future.completeExceptionally(
+ new IllegalStateException(
+
CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE)));
+ pendingRequestFutures.clear();
}
public <
@@ -618,40 +637,59 @@ public class RestClient implements AutoCloseableAsync {
}
});
- return channelFuture
- .thenComposeAsync(
- channel -> {
- ClientHandler handler =
channel.pipeline().get(ClientHandler.class);
-
- CompletableFuture<JsonResponse> future;
- boolean success = false;
-
- try {
- if (handler == null) {
- throw new IOException(
- "Netty pipeline was not properly
initialized.");
- } else {
- httpRequest.writeTo(channel);
- future = handler.getJsonFuture();
- success = true;
- }
- } catch (IOException e) {
- future =
- FutureUtils.completedExceptionally(
- new ConnectionException(
- "Could not write
request.", e));
- } finally {
- if (!success) {
- channel.close();
- }
- }
+ final CompletableFuture<P> responseFuture =
+ channelFuture
+ .thenComposeAsync(
+ channel -> {
+ ClientHandler handler =
+
channel.pipeline().get(ClientHandler.class);
+
+ CompletableFuture<JsonResponse> future;
+ boolean success = false;
+
+ try {
+ if (handler == null) {
+ throw new IOException(
+ "Netty pipeline was not
properly initialized.");
+ } else {
+ httpRequest.writeTo(channel);
+ future = handler.getJsonFuture();
+ success = true;
+ }
+ } catch (IOException e) {
+ future =
+
FutureUtils.completedExceptionally(
+ new
ConnectionException(
+ "Could not
write request.", e));
+ } finally {
+ if (!success) {
+ channel.close();
+ }
+ }
+
+ return future;
+ },
+ executor)
+ .thenComposeAsync(
+ (JsonResponse rawResponse) ->
+ parseResponse(rawResponse,
responseType),
+ executor);
+
+ // Register for the whole request lifetime; deregister on completion
to avoid leaking.
+ pendingRequestFutures.add(responseFuture);
+ responseFuture.whenComplete(
+ (ignored, throwable) ->
pendingRequestFutures.remove(responseFuture));
+
+ // Re-check after registering to cover a close() that ran between the
isRunning check at the
+ // top of this method and the registration above (such a close could
not have seen this
+ // future). Only fail it if still registered, so we do not
double-complete; completion is
+ // idempotent regardless.
+ if (!isRunning.get() && pendingRequestFutures.remove(responseFuture)) {
+ responseFuture.completeExceptionally(
+ new
IllegalStateException(CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE));
+ }
- return future;
- },
- executor)
- .thenComposeAsync(
- (JsonResponse rawResponse) ->
parseResponse(rawResponse, responseType),
- executor);
+ return responseFuture;
}
private static <P extends ResponseBody> CompletableFuture<P> parseResponse(
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index bbad7c7df1c..4630011b2ee 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.rest;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
@@ -58,8 +59,11 @@ import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.Queue;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -360,6 +364,181 @@ class RestClientTest {
}
}
+ /**
+ * Verifies that {@code close()} fails a request that is in-flight past
the connect phase, and
+ * that it does so specifically through the {@code pendingRequestFutures}
mechanism rather than
+ * through {@code ClientHandler#channelInactive}.
+ *
+ * <p>The terminal response future is only wired to the {@code
ClientHandler}'s {@code
+ * jsonFuture} once the first response-composition stage runs on the
client's executor. This
+ * test installs a <em>deferring</em> executor that captures those stages
without ever running
+ * them, so:
+ *
+ * <ul>
+ * <li>the connect phase still completes (the connect listener runs on
the Netty event loop
+ * and drains {@code responseChannelFutures}), leaving the request
in-flight; but
+ * <li>the {@code channelInactive} completion of {@code jsonFuture}
triggered by {@code
+ * close()} can never reach the terminal future, because the stage
that would subscribe to
+ * {@code jsonFuture} never runs.
+ * </ul>
+ *
+ * <p>The only mechanism left that can complete the terminal future is
{@code
+ * pendingRequestFutures}. The test therefore asserts the
<em>identity</em> of the failure
+ * ({@link IllegalStateException} with {@code
CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE}), not
+ * merely that the future failed: a {@code ConnectionClosedException} here
would indicate the
+ * {@code channelInactive} path completed it instead. Without the {@code
pendingRequestFutures}
+ * tracking the terminal future would never complete and {@code
failsWithin} would elapse.
+ */
+ @Test
+ void testCloseFailsInFlightRequestFutureViaPendingRequestFutures() throws
Exception {
+ final Configuration config = new Configuration();
+
+ // Captures the response-composition stages without ever executing
them, so the terminal
+ // future is never subscribed to the handler's jsonFuture.
+ final Queue<Runnable> deferredStages = new ConcurrentLinkedQueue<>();
+ final Executor deferringExecutor = deferredStages::add;
+
+ Socket connectionSocket = null;
+ try (final ServerSocket serverSocket = new ServerSocket(0);
+ final RestClient restClient = new RestClient(config,
deferringExecutor)) {
+
+ final String targetAddress = "localhost";
+ final int targetPort = serverSocket.getLocalPort();
+
+ // A server that accepts the connection but never sends a
response, so the request stays
+ // in its in-flight (response) phase until the client is closed.
+ final CompletableFuture<Socket> acceptedSocket =
+ CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(
+ () ->
NetUtils.acceptWithoutTimeout(serverSocket)));
+
+ assertThat(restClient.getResponseChannelFutures()).isEmpty();
+ assertThat(restClient.getPendingRequestFutures()).isEmpty();
+
+ final CompletableFuture<EmptyResponseBody> responseFuture =
+ restClient.sendRequest(
+ targetAddress,
+ targetPort,
+ new TestMessageHeaders(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList());
+
+ // Once the server accepts, the connect listener has run and
removed the connect-phase
+ // future from responseChannelFutures: the request is now
in-flight. The first
+ // composition stage has been submitted to the deferring executor
(captured, not run).
+ connectionSocket = acceptedSocket.get(TIMEOUT, TimeUnit.SECONDS);
+
+ CommonTestUtils.waitUtil(
+ () -> restClient.getResponseChannelFutures().isEmpty(),
+ Duration.ofSeconds(TIMEOUT),
+ "responseChannelFutures was not drained after the connect
phase completed");
+
+ restClient.close();
+
+ // The terminal future can only have been completed by the
pendingRequestFutures path:
+ // its composition stages never ran, so the channelInactive
completion of jsonFuture
+ // could not reach it. Asserting the failure identity proves the
change is exercised.
+ assertThat(responseFuture)
+ .as("close() must fail the in-flight future via
pendingRequestFutures")
+ .failsWithin(Duration.ofSeconds(TIMEOUT))
+ .withThrowableOfType(ExecutionException.class)
+ .withCauseInstanceOf(IllegalStateException.class)
+
.withMessageContaining(RestClient.CLOSED_BEFORE_REQUEST_COMPLETED_MESSAGE);
+
+ // After close, the tracking collection must be drained.
+ CommonTestUtils.waitUtil(
+ () -> restClient.getPendingRequestFutures().isEmpty(),
+ Duration.ofSeconds(TIMEOUT),
+ "pendingRequestFutures was not drained after close");
+
+ // Sanity check: exactly the first composition stage was deferred
(never executed),
+ // which is what isolates the pendingRequestFutures path from
channelInactive. The
+ // second stage cannot be submitted until the first runs, which it
never does.
+ assertThat(deferredStages).hasSize(1);
+ } finally {
+ if (connectionSocket != null) {
+ connectionSocket.close();
+ }
+ }
+ }
+
+ /**
+ * End-to-end companion to {@link
#testCloseFailsInFlightRequestFutureViaPendingRequestFutures}:
+ * with the real executor running the response-composition stages,
verifies that {@code close()}
+ * fails an in-flight request future. This exercises the normal production
path (including the
+ * handler's {@code channelInactive} completion) rather than isolating a
single mechanism. The
+ * request is driven against a local server that accepts the connection
but never replies; all
+ * waits are bounded so a regression fails rather than hangs.
+ */
+ @Test
+ void testCloseFailsInFlightRequestFutureAfterConnectPhase() throws
Exception {
+ final Configuration config = new Configuration();
+
+ Socket connectionSocket = null;
+ try (final ServerSocket serverSocket = new ServerSocket(0);
+ final RestClient restClient =
+ new RestClient(config,
EXECUTOR_EXTENSION.getExecutor())) {
+
+ final String targetAddress = "localhost";
+ final int targetPort = serverSocket.getLocalPort();
+
+ // A server that accepts the connection but never sends a
response, so the request stays
+ // in its in-flight (response) phase until the client is closed.
+ final CompletableFuture<Socket> acceptedSocket =
+ CompletableFuture.supplyAsync(
+ CheckedSupplier.unchecked(
+ () ->
NetUtils.acceptWithoutTimeout(serverSocket)));
+
+ assertThat(restClient.getResponseChannelFutures()).isEmpty();
+ assertThat(restClient.getPendingRequestFutures()).isEmpty();
+
+ final CompletableFuture<EmptyResponseBody> responseFuture =
+ restClient.sendRequest(
+ targetAddress,
+ targetPort,
+ new TestMessageHeaders(),
+ EmptyMessageParameters.getInstance(),
+ EmptyRequestBody.getInstance(),
+ Collections.emptyList());
+
+ // Once the server accepts, the connect listener has run and
removed the connect-phase
+ // future from responseChannelFutures: the request is now
in-flight.
+ connectionSocket = acceptedSocket.get(TIMEOUT, TimeUnit.SECONDS);
+
+ CommonTestUtils.waitUtil(
+ () -> restClient.getResponseChannelFutures().isEmpty(),
+ Duration.ofSeconds(TIMEOUT),
+ "responseChannelFutures was not drained after the connect
phase completed");
+
+ // Without the fix the in-flight future is tracked by nothing, so
this bounded wait
+ // elapses and the assertion fails (it does not hang).
+ CommonTestUtils.waitUtil(
+ () -> restClient.getPendingRequestFutures().size() == 1,
+ Duration.ofSeconds(TIMEOUT),
+ "Terminal response future must remain tracked by
pendingRequestFutures while "
+ + "the request is in-flight so that close can fail
it");
+
+ restClient.close();
+
+ // The terminal future must be failed by close (bounded await,
never hangs).
+ assertThat(responseFuture)
+ .as("Closing the client must fail the in-flight request
future")
+ .failsWithin(Duration.ofSeconds(TIMEOUT))
+ .withThrowableOfType(ExecutionException.class);
+
+ // After close, the tracking collection must be drained.
+ CommonTestUtils.waitUtil(
+ () -> restClient.getPendingRequestFutures().isEmpty(),
+ Duration.ofSeconds(TIMEOUT),
+ "pendingRequestFutures was not drained after close");
+ } finally {
+ if (connectionSocket != null) {
+ connectionSocket.close();
+ }
+ }
+ }
+
@Test
void testResponseChannelFuturesResolvedExceptionallyOnClose() throws
Exception {
try (final RestClient restClient =