This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8a046177dbfff63b3a9285885ad70f85c22c99ca
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Sep 26 13:44:03 2018 +0200

    [FLINK-10415] Fail response future if RestClient connection becomes idle
    
    This commit adds a IdleStateHandler to the Netty pipeline of the 
RestClient. The
    IdleStateHandler sends an IdleStateEvent if it detects that the connection 
is idle
    for too long. If we see an IdleStateEvent, then we close the connection and 
fail
    the json response future.
---
 docs/_includes/generated/rest_configuration.html   |  5 +++
 .../apache/flink/configuration/RestOptions.java    |  8 +++++
 .../runtime/rest/ConnectionIdleException.java      | 42 ++++++++++++++++++++++
 .../org/apache/flink/runtime/rest/RestClient.java  | 37 ++++++++++++++-----
 .../runtime/rest/RestClientConfiguration.java      | 17 +++++++--
 .../apache/flink/runtime/rest/RestClientTest.java  |  8 +++--
 6 files changed, 104 insertions(+), 13 deletions(-)

diff --git a/docs/_includes/generated/rest_configuration.html 
b/docs/_includes/generated/rest_configuration.html
index 1de4165..25da9cf 100644
--- a/docs/_includes/generated/rest_configuration.html
+++ b/docs/_includes/generated/rest_configuration.html
@@ -33,6 +33,11 @@
             <td>The maximum time in ms for the client to establish a TCP 
connection.</td>
         </tr>
         <tr>
+            <td><h5>rest.idleness-timeout</h5></td>
+            <td style="word-wrap: break-word;">300000</td>
+            <td>The maximum time in ms for a connection to stay idle before 
failing.</td>
+        </tr>
+        <tr>
             <td><h5>rest.port</h5></td>
             <td style="word-wrap: break-word;">8081</td>
             <td>The port that the server listens on / the client connects 
to.</td>
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java 
b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
index 3c24158..c834483 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/RestOptions.java
@@ -94,6 +94,14 @@ public class RestOptions {
                        .withDescription("The maximum time in ms for the client 
to establish a TCP connection.");
 
        /**
+        * The maximum time in ms for a connection to stay idle before failing.
+        */
+       public static final ConfigOption<Long> IDLENESS_TIMEOUT =
+               key("rest.idleness-timeout")
+                       .defaultValue(5L * 60L * 1_000L) // 5 minutes
+                       .withDescription("The maximum time in ms for a 
connection to stay idle before failing.");
+
+       /**
         * The maximum content length that the server will handle.
         */
        public static final ConfigOption<Integer> SERVER_MAX_CONTENT_LENGTH =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
new file mode 100644
index 0000000..044bfce
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest;
+
+import java.io.IOException;
+
+/**
+ * Exception which is thrown by the {@link RestClient} if a connection
+ * becomes idle.
+ */
+public class ConnectionIdleException extends IOException {
+
+       private static final long serialVersionUID = 5103778538635217293L;
+
+       public ConnectionIdleException(String message) {
+               super(message);
+       }
+
+       public ConnectionIdleException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public ConnectionIdleException(Throwable cause) {
+               super(cause);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 16e4c98..e2d85e9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.rest.util.RestClientException;
 import org.apache.flink.runtime.rest.util.RestConstants;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.util.AutoCloseableAsync;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
@@ -70,6 +71,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.Http
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
 import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
+import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
+import 
org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;
 import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.DefaultThreadFactory;
 
 import org.slf4j.Logger;
@@ -116,16 +119,22 @@ public class RestClient implements AutoCloseableAsync {
                ChannelInitializer<SocketChannel> initializer = new 
ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) 
{
-                               // SSL should be the first handler in the 
pipeline
-                               if (sslEngineFactory != null) {
-                                       socketChannel.pipeline().addLast("ssl", 
new SslHandler(sslEngineFactory.createSSLEngine()));
-                               }
+                               try {
+                                       // SSL should be the first handler in 
the pipeline
+                                       if (sslEngineFactory != null) {
+                                               
socketChannel.pipeline().addLast("ssl", new 
SslHandler(sslEngineFactory.createSSLEngine()));
+                                       }
 
-                               socketChannel.pipeline()
-                                       .addLast(new HttpClientCodec())
-                                       .addLast(new 
HttpObjectAggregator(configuration.getMaxContentLength()))
-                                       .addLast(new ChunkedWriteHandler()) // 
required for multipart-requests
-                                       .addLast(new ClientHandler());
+                                       socketChannel.pipeline()
+                                               .addLast(new HttpClientCodec())
+                                               .addLast(new 
HttpObjectAggregator(configuration.getMaxContentLength()))
+                                               .addLast(new 
ChunkedWriteHandler()) // required for multipart-requests
+                                               .addLast(new 
IdleStateHandler(configuration.getIdlenessTimeout(), 
configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), 
TimeUnit.MILLISECONDS))
+                                               .addLast(new ClientHandler());
+                               } catch (Throwable t) {
+                                       t.printStackTrace();
+                                       ExceptionUtils.rethrow(t);
+                               }
                        }
                };
                NioEventLoopGroup group = new NioEventLoopGroup(1, new 
DefaultThreadFactory("flink-rest-client-netty"));
@@ -424,6 +433,16 @@ public class RestClient implements AutoCloseableAsync {
                }
 
                @Override
+               public void userEventTriggered(ChannelHandlerContext ctx, 
Object evt) throws Exception {
+                       if (evt instanceof IdleStateEvent) {
+                               jsonFuture.completeExceptionally(new 
ConnectionIdleException("Channel became idle."));
+                               ctx.close();
+                       } else {
+                               super.userEventTriggered(ctx, evt);
+                       }
+               }
+
+               @Override
                public void exceptionCaught(final ChannelHandlerContext ctx, 
final Throwable cause) {
                        if (cause instanceof TooLongFrameException) {
                                jsonFuture.completeExceptionally(new 
TooLongFrameException(String.format(
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
index e09f357..b70b1f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClientConfiguration.java
@@ -40,15 +40,19 @@ public final class RestClientConfiguration {
 
        private final long connectionTimeout;
 
+       private final long idlenessTimeout;
+
        private final int maxContentLength;
 
        private RestClientConfiguration(
                        @Nullable final SSLEngineFactory sslEngineFactory,
                        final long connectionTimeout,
+                       final long idlenessTimeout,
                        final int maxContentLength) {
                checkArgument(maxContentLength > 0, "maxContentLength must be 
positive, was: %d", maxContentLength);
                this.sslEngineFactory = sslEngineFactory;
                this.connectionTimeout = connectionTimeout;
+               this.idlenessTimeout = idlenessTimeout;
                this.maxContentLength = maxContentLength;
        }
 
@@ -63,13 +67,20 @@ public final class RestClientConfiguration {
        }
 
        /**
-        * @see RestOptions#CONNECTION_TIMEOUT
+        * {@see RestOptions#CONNECTION_TIMEOUT}.
         */
        public long getConnectionTimeout() {
                return connectionTimeout;
        }
 
        /**
+        * {@see RestOptions#IDLENESS_TIMEOUT}.
+        */
+       public long getIdlenessTimeout() {
+               return idlenessTimeout;
+       }
+
+       /**
         * Returns the max content length that the REST client endpoint could 
handle.
         *
         * @return max content length that the REST client endpoint could handle
@@ -102,8 +113,10 @@ public final class RestClientConfiguration {
 
                final long connectionTimeout = 
config.getLong(RestOptions.CONNECTION_TIMEOUT);
 
+               final long idlenessTimeout = 
config.getLong(RestOptions.IDLENESS_TIMEOUT);
+
                int maxContentLength = 
config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);
 
-               return new RestClientConfiguration(sslEngineFactory, 
connectionTimeout, maxContentLength);
+               return new RestClientConfiguration(sslEngineFactory, 
connectionTimeout, idlenessTimeout, maxContentLength);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
index 8650929..f670bec 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestClientTest.java
@@ -81,8 +81,10 @@ public class RestClientTest extends TestLogger {
         */
        @Test
        public void testConnectionClosedHandling() throws Exception {
+               final Configuration config = new Configuration();
+               config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
                try (final ServerSocket serverSocket = new ServerSocket(0);
-                       final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
TestingUtils.defaultExecutor())) {
+                       final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
TestingUtils.defaultExecutor())) {
 
                        final String targetAddress = "localhost";
                        final int targetPort = serverSocket.getLocalPort();
@@ -127,11 +129,13 @@ public class RestClientTest extends TestLogger {
         */
        @Test
        public void testRestClientClosedHandling() throws Exception {
+               final Configuration config = new Configuration();
+               config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
 
                Socket connectionSocket = null;
 
                try (final ServerSocket serverSocket = new ServerSocket(0);
-                       final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), 
TestingUtils.defaultExecutor())) {
+                       final RestClient restClient = new 
RestClient(RestClientConfiguration.fromConfiguration(config), 
TestingUtils.defaultExecutor())) {
 
                        final String targetAddress = "localhost";
                        final int targetPort = serverSocket.getLocalPort();

Reply via email to