This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.6 in repository https://gitbox.apache.org/repos/asf/flink.git
commit bf49d9aff4d10487e176c450eaf634eb1c45c290 Author: Till Rohrmann <[email protected]> AuthorDate: Wed Sep 26 09:12:06 2018 +0200 [hotfix] Make RestClient AutoCloseableAsync --- .../org/apache/flink/runtime/rest/RestClient.java | 51 +++++++++++++++------- .../apache/flink/runtime/rest/RestClientTest.java | 3 +- 2 files changed, 36 insertions(+), 18 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 052b9b1..c6ebd35 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.rest.messages.ResponseBody; import org.apache.flink.runtime.rest.util.RestClientException; import org.apache.flink.runtime.rest.util.RestConstants; import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -85,13 +86,14 @@ import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus.REQUEST_ENTITY_TOO_LARGE; /** * This client is the counter-part to the {@link RestServerEndpoint}. */ -public class RestClient { +public class RestClient implements AutoCloseableAsync { private static final Logger LOG = LoggerFactory.getLogger(RestClient.class); private static final ObjectMapper objectMapper = RestMapperUtils.getStrictObjectMapper(); @@ -101,9 +103,14 @@ public class RestClient { private final Bootstrap bootstrap; + private final CompletableFuture<Void> terminationFuture; + + private final AtomicBoolean isRunning = new AtomicBoolean(true); + public RestClient(RestClientConfiguration configuration, Executor executor) { Preconditions.checkNotNull(configuration); this.executor = Preconditions.checkNotNull(executor); + this.terminationFuture = new CompletableFuture<>(); final SSLEngineFactory sslEngineFactory = configuration.getSslEngineFactory(); ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() { @@ -133,30 +140,42 @@ public class RestClient { LOG.info("Rest client endpoint started."); } + @Override + public CompletableFuture<Void> closeAsync() { + return shutdownInternally(Time.seconds(10L)); + } + public void shutdown(Time timeout) { - LOG.info("Shutting down rest endpoint."); - CompletableFuture<?> groupFuture = new CompletableFuture<>(); - if (bootstrap != null) { - if (bootstrap.group() != null) { - bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) - .addListener(finished -> { - if (finished.isSuccess()) { - groupFuture.complete(null); - } else { - groupFuture.completeExceptionally(finished.cause()); - } - }); - } - } + final CompletableFuture<Void> shutDownFuture = shutdownInternally(timeout); try { - groupFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + shutDownFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); LOG.info("Rest endpoint shutdown complete."); } catch (Exception e) { LOG.warn("Rest endpoint shutdown failed.", e); } } + private CompletableFuture<Void> shutdownInternally(Time timeout) { + if (isRunning.compareAndSet(true, false)) { + LOG.info("Shutting down rest endpoint."); + + if (bootstrap != null) { + if (bootstrap.group() != null) { + bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS) + .addListener(finished -> { + if (finished.isSuccess()) { + terminationFuture.complete(null); + } else { + terminationFuture.completeExceptionally(finished.cause()); + } + }); + } + } + } + return terminationFuture; + } + public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest( String targetAddress, int targetPort, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java index 209f2d1..d3d895a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java @@ -49,9 +49,8 @@ public class RestClientTest extends TestLogger { public void testConnectionTimeout() throws Exception { final Configuration config = new Configuration(); config.setLong(RestOptions.CONNECTION_TIMEOUT, 1); - final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor()); final String unroutableIp = "10.255.255.1"; - try { + try (final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), Executors.directExecutor())) { restClient.sendRequest( unroutableIp, 80,
