This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 08e25db3326ebf9b9acb022dc5644528d44882ae
Author: Till Rohrmann <[email protected]>
AuthorDate: Wed Sep 26 16:51:58 2018 +0200

    [FLINK-10415] Fail requests with empty Netty pipeline in RestClient
    
    Sometimes it can happen that Netty does not properly initialize the channel
    pipeline when sending a request from the RestClient. In this situation, we
    need to fail the response so that the caller will be notified about the un-
    successful call.
    
    This closes #6763.
---
 .../runtime/rest/ConnectionClosedException.java     |  4 +---
 ...nIdleException.java => ConnectionException.java} | 14 +++++++-------
 .../flink/runtime/rest/ConnectionIdleException.java |  4 +---
 .../org/apache/flink/runtime/rest/RestClient.java   | 21 +++++++++++++++++----
 4 files changed, 26 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
index b294f49..339a549 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
-import java.io.IOException;
-
 /**
  * Exception which is thrown if the {@link RestClient} detects that a 
connection
  * was closed.
  */
-public class ConnectionClosedException extends IOException {
+public class ConnectionClosedException extends ConnectionException {
        private static final long serialVersionUID = 3802002501688542472L;
 
        public ConnectionClosedException(String message) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
similarity index 71%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
index 044bfce..d92c643 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java
@@ -21,22 +21,22 @@ package org.apache.flink.runtime.rest;
 import java.io.IOException;
 
 /**
- * Exception which is thrown by the {@link RestClient} if a connection
- * becomes idle.
+ * Base class for all connection related exception thrown by the
+ * {@link RestClient}.
  */
-public class ConnectionIdleException extends IOException {
+public class ConnectionException extends IOException {
 
-       private static final long serialVersionUID = 5103778538635217293L;
+       private static final long serialVersionUID = -8483133957344173698L;
 
-       public ConnectionIdleException(String message) {
+       public ConnectionException(String message) {
                super(message);
        }
 
-       public ConnectionIdleException(String message, Throwable cause) {
+       public ConnectionException(String message, Throwable cause) {
                super(message, cause);
        }
 
-       public ConnectionIdleException(Throwable cause) {
+       public ConnectionException(Throwable cause) {
                super(cause);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
index 044bfce..96c335d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java
@@ -18,13 +18,11 @@
 
 package org.apache.flink.runtime.rest;
 
-import java.io.IOException;
-
 /**
  * Exception which is thrown by the {@link RestClient} if a connection
  * becomes idle.
  */
-public class ConnectionIdleException extends IOException {
+public class ConnectionIdleException extends ConnectionException {
 
        private static final long serialVersionUID = 5103778538635217293L;
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index ced2639..d4c5e88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.rest.versioning.RestAPIVersion;
 import org.apache.flink.runtime.util.ExecutorThreadFactory;
 import org.apache.flink.util.AutoCloseableAsync;
 import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.Preconditions;
 
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser;
@@ -339,12 +338,26 @@ public class RestClient implements AutoCloseableAsync {
                        .thenComposeAsync(
                                channel -> {
                                        ClientHandler handler = 
channel.pipeline().get(ClientHandler.class);
-                                       CompletableFuture<JsonResponse> future 
= handler.getJsonFuture();
+
+                                       CompletableFuture<JsonResponse> future;
+                                       boolean success = false;
+
                                        try {
-                                               httpRequest.writeTo(channel);
+                                               if (handler == null) {
+                                                       throw new 
IOException("Netty pipeline was not properly initialized.");
+                                               } else {
+                                                       
httpRequest.writeTo(channel);
+                                                       future = 
handler.getJsonFuture();
+                                                       success = true;
+                                               }
                                        } catch (IOException e) {
-                                               return 
FutureUtils.completedExceptionally(new FlinkException("Could not write 
request.", e));
+                                               future = 
FutureUtils.completedExceptionally(new ConnectionException("Could not write 
request.", e));
+                                       } finally {
+                                               if (!success) {
+                                                       channel.close();
+                                               }
                                        }
+
                                        return future;
                                },
                                executor)

Reply via email to