XComp commented on code in PR #22987: URL: https://github.com/apache/flink/pull/22987#discussion_r1274946566
########## flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java: ########## @@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws Exception { } } + /** + * Tests that the futures returned by {@link RestClient} fail immediately if the client is + * already closed. + * + * <p>See FLINK-32583 + */ + @Test + public void testCloseClientBeforeRequest() throws Exception { + try (final RestClient restClient = + new RestClient(new Configuration(), Executors.directExecutor())) { + restClient.close(); // Intentionally close the client prior to the request + + CompletableFuture<?> future = + restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()); + + // Call get() on the future with a timeout of 0s so we can test that the exception + // thrown is not a TimeoutException, which is what would be thrown if restClient were + // not already closed + final ThrowingRunnable getFuture = () -> future.get(0, TimeUnit.SECONDS); + + final Throwable cause = assertThrows(ExecutionException.class, getFuture).getCause(); + assertThat(cause, instanceOf(IllegalStateException.class)); + assertThat(cause.getMessage(), equalTo("RestClient is already closed")); + } + } + + @Test + public void testCloseClientWhileProcessingRequest() throws Exception { + // Set up a Netty SelectStrategy with latches that allow us to step forward through Netty's + // request state machine, closing the client at a particular moment + final OneShotLatch connectTriggered = new OneShotLatch(); + final OneShotLatch closeTriggered = new OneShotLatch(); + final SelectStrategy fallbackSelectStrategy = + DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(); + final SelectStrategyFactory selectStrategyFactory = + () -> + (selectSupplier, hasTasks) -> { + connectTriggered.trigger(); + closeTriggered.awaitQuietly(1, TimeUnit.SECONDS); Review Comment: ```suggestion closeTriggered.awaitQuietly(); ``` The community agreed on not using timeouts in tests (see [Flink Coding Conventions](https://flink.apache.org/how-to-contribute/code-style-and-quality-common/#avoid-timeouts-in-junit-tests)) since they are a source of instabilities. Instead the test would run until the overall CI build is cancelled after ~4hrs. ########## flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java: ########## @@ -207,6 +218,120 @@ public void testRestClientClosedHandling() throws Exception { } } + /** + * Tests that the futures returned by {@link RestClient} fail immediately if the client is + * already closed. + * + * <p>See FLINK-32583 + */ + @Test + public void testCloseClientBeforeRequest() throws Exception { + try (final RestClient restClient = + new RestClient(new Configuration(), Executors.directExecutor())) { + restClient.close(); // Intentionally close the client prior to the request + + CompletableFuture<?> future = + restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()); + + // Call get() on the future with a timeout of 0s so we can test that the exception + // thrown is not a TimeoutException, which is what would be thrown if restClient were + // not already closed + final ThrowingRunnable getFuture = () -> future.get(0, TimeUnit.SECONDS); + + final Throwable cause = assertThrows(ExecutionException.class, getFuture).getCause(); + assertThat(cause, instanceOf(IllegalStateException.class)); + assertThat(cause.getMessage(), equalTo("RestClient is already closed")); + } + } + + @Test + public void testCloseClientWhileProcessingRequest() throws Exception { + // Set up a Netty SelectStrategy with latches that allow us to step forward through Netty's + // request state machine, closing the client at a particular moment + final OneShotLatch connectTriggered = new OneShotLatch(); + final OneShotLatch closeTriggered = new OneShotLatch(); + final SelectStrategy fallbackSelectStrategy = + DefaultSelectStrategyFactory.INSTANCE.newSelectStrategy(); + final SelectStrategyFactory selectStrategyFactory = + () -> + (selectSupplier, hasTasks) -> { + connectTriggered.trigger(); + closeTriggered.awaitQuietly(1, TimeUnit.SECONDS); + + return fallbackSelectStrategy.calculateStrategy( + selectSupplier, hasTasks); + }; + + try (final RestClient restClient = + new RestClient( + new Configuration(), Executors.directExecutor(), selectStrategyFactory)) { + // Check that client's internal collection of pending response futures is empty prior to + // the request + assertThat(restClient.getResponseChannelFutures(), empty()); + + final CompletableFuture<?> requestFuture = + restClient.sendRequest( + unroutableIp, + 80, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance()); + + // Check that client's internal collection of pending response futures now has one + // entry, presumably due to the call to sendRequest + assertThat(restClient.getResponseChannelFutures(), hasSize(1)); + + // Wait for Netty to start connecting, then while it's paused in the SelectStrategy, + // close the client before unpausing Netty + connectTriggered.await(1, TimeUnit.SECONDS); Review Comment: same here: No timeout should be used. ########## flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java: ########## @@ -150,11 +159,30 @@ public static RestClient forUrl(Configuration configuration, Executor executor, public RestClient(Configuration configuration, Executor executor) throws ConfigurationException { - this(configuration, executor, null, -1); + this(configuration, executor, null, -1, DefaultSelectStrategyFactory.INSTANCE); Review Comment: True, I overlooked that we're doing it your way in for the default port `-1`. Feel free to revert it if you want to be more consistent in this class :innocent: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org