This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 559b8f14ea9cc873a64f9c7954951882444693ac Author: Till Rohrmann <[email protected]> AuthorDate: Wed Sep 26 09:33:05 2018 +0200 [FLINK-10415] Fail response future if connection closes in RestClient If the RestClient detects that a connection was closed (channel became inactive), then it now fails the json response future with a ConnectionClosedException. --- .../runtime/rest/ConnectionClosedException.java | 41 +++++++++ .../org/apache/flink/runtime/rest/RestClient.java | 6 ++ .../apache/flink/runtime/rest/RestClientTest.java | 102 +++++++++++++++++++++ 3 files changed, 149 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java new file mode 100644 index 0000000..b294f49 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import java.io.IOException; + +/** + * Exception which is thrown if the {@link RestClient} detects that a connection + * was closed. + */ +public class ConnectionClosedException extends IOException { + private static final long serialVersionUID = 3802002501688542472L; + + public ConnectionClosedException(String message) { + super(message); + } + + public ConnectionClosedException(String message, Throwable cause) { + super(message, cause); + } + + public ConnectionClosedException(Throwable cause) { + super(cause); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 3aa93bb..d0280a5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -448,6 +448,12 @@ public class RestClient implements AutoCloseableAsync { } @Override + public void channelInactive(ChannelHandlerContext ctx) { + jsonFuture.completeExceptionally(new ConnectionClosedException("Channel became inactive.")); + ctx.close(); + } + + @Override public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) { if (cause instanceof TooLongFrameException) { jsonFuture.completeExceptionally(new TooLongFrameException(String.format( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index 11434ad..4a0f5a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -26,8 +26,10 @@ import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.versioning.RestAPIVersion; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; +import org.apache.flink.util.function.CheckedSupplier; import org.apache.flink.shaded.netty4.io.netty.channel.ConnectTimeoutException; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; @@ -35,10 +37,14 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import org.junit.Assert; import org.junit.Test; +import java.io.IOException; +import java.net.ServerSocket; +import java.net.Socket; import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.instanceOf; @@ -51,6 +57,8 @@ public class RestClientTest extends TestLogger { private static final String unroutableIp = "10.255.255.1"; + private static final long TIMEOUT = 10L; + @Test public void testConnectionTimeout() throws Exception { final Configuration config = new Configuration(); @@ -88,6 +96,100 @@ public class RestClientTest extends TestLogger { } } + /** + * Tests that we fail the operation if the remote connection closes. + */ + @Test + public void testConnectionClosedHandling() throws Exception { + try (final ServerSocket serverSocket = new ServerSocket(0); + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + + // start server + final CompletableFuture<Socket> socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept)); + + final CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest( + targetAddress, + targetPort, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList()); + + Socket connectionSocket = null; + + try { + connectionSocket = socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); + } catch (TimeoutException ignored) { + // could not establish a server connection --> see that the response failed + socketCompletableFuture.cancel(true); + } + + if (connectionSocket != null) { + // close connection + connectionSocket.close(); + } + + try { + responseFuture.get(); + } catch (ExecutionException ee) { + if (!ExceptionUtils.findThrowable(ee, IOException.class).isPresent()) { + throw ee; + } + } + } + } + + /** + * Tests that we fail the operation if the client closes. + */ + @Test + public void testRestClientClosedHandling() throws Exception { + + Socket connectionSocket = null; + + try (final ServerSocket serverSocket = new ServerSocket(0); + final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) { + + final String targetAddress = "localhost"; + final int targetPort = serverSocket.getLocalPort(); + + // start server + final CompletableFuture<Socket> socketCompletableFuture = CompletableFuture.supplyAsync(CheckedSupplier.unchecked(serverSocket::accept)); + + final CompletableFuture<EmptyResponseBody> responseFuture = restClient.sendRequest( + targetAddress, + targetPort, + new TestMessageHeaders(), + EmptyMessageParameters.getInstance(), + EmptyRequestBody.getInstance(), + Collections.emptyList()); + + try { + connectionSocket = socketCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS); + } catch (TimeoutException ignored) { + // could not establish a server connection --> see that the response failed + socketCompletableFuture.cancel(true); + } + + restClient.close(); + + try { + responseFuture.get(); + } catch (ExecutionException ee) { + if (!ExceptionUtils.findThrowable(ee, IOException.class).isPresent()) { + throw ee; + } + } + } finally { + if (connectionSocket != null) { + connectionSocket.close(); + } + } + } + private static class TestMessageHeaders implements MessageHeaders<EmptyRequestBody, EmptyResponseBody, EmptyMessageParameters> { @Override
