This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 08e25db3326ebf9b9acb022dc5644528d44882ae Author: Till Rohrmann <[email protected]> AuthorDate: Wed Sep 26 16:51:58 2018 +0200 [FLINK-10415] Fail requests with empty Netty pipeline in RestClient Sometimes it can happen that Netty does not properly initialize the channel pipeline when sending a request from the RestClient. In this situation, we need to fail the response so that the caller will be notified about the un- successful call. This closes #6763. --- .../runtime/rest/ConnectionClosedException.java | 4 +--- ...nIdleException.java => ConnectionException.java} | 14 +++++++------- .../flink/runtime/rest/ConnectionIdleException.java | 4 +--- .../org/apache/flink/runtime/rest/RestClient.java | 21 +++++++++++++++++---- 4 files changed, 26 insertions(+), 17 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java index b294f49..339a549 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionClosedException.java @@ -18,13 +18,11 @@ package org.apache.flink.runtime.rest; -import java.io.IOException; - /** * Exception which is thrown if the {@link RestClient} detects that a connection * was closed. */ -public class ConnectionClosedException extends IOException { +public class ConnectionClosedException extends ConnectionException { private static final long serialVersionUID = 3802002501688542472L; public ConnectionClosedException(String message) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java similarity index 71% copy from flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java index 044bfce..d92c643 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionException.java @@ -21,22 +21,22 @@ package org.apache.flink.runtime.rest; import java.io.IOException; /** - * Exception which is thrown by the {@link RestClient} if a connection - * becomes idle. + * Base class for all connection related exception thrown by the + * {@link RestClient}. */ -public class ConnectionIdleException extends IOException { +public class ConnectionException extends IOException { - private static final long serialVersionUID = 5103778538635217293L; + private static final long serialVersionUID = -8483133957344173698L; - public ConnectionIdleException(String message) { + public ConnectionException(String message) { super(message); } - public ConnectionIdleException(String message, Throwable cause) { + public ConnectionException(String message, Throwable cause) { super(message, cause); } - public ConnectionIdleException(Throwable cause) { + public ConnectionException(Throwable cause) { super(cause); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java index 044bfce..96c335d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/ConnectionIdleException.java @@ -18,13 +18,11 @@ package org.apache.flink.runtime.rest; -import java.io.IOException; - /** * Exception which is thrown by the {@link RestClient} if a connection * becomes idle. */ -public class ConnectionIdleException extends IOException { +public class ConnectionIdleException extends ConnectionException { private static final long serialVersionUID = 5103778538635217293L; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index ced2639..d4c5e88 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.rest.versioning.RestAPIVersion; import org.apache.flink.runtime.util.ExecutorThreadFactory; import org.apache.flink.util.AutoCloseableAsync; import org.apache.flink.util.ExceptionUtils; -import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParser; @@ -339,12 +338,26 @@ public class RestClient implements AutoCloseableAsync { .thenComposeAsync( channel -> { ClientHandler handler = channel.pipeline().get(ClientHandler.class); - CompletableFuture<JsonResponse> future = handler.getJsonFuture(); + + CompletableFuture<JsonResponse> future; + boolean success = false; + try { - httpRequest.writeTo(channel); + if (handler == null) { + throw new IOException("Netty pipeline was not properly initialized."); + } else { + httpRequest.writeTo(channel); + future = handler.getJsonFuture(); + success = true; + } } catch (IOException e) { - return FutureUtils.completedExceptionally(new FlinkException("Could not write request.", e)); + future = FutureUtils.completedExceptionally(new ConnectionException("Could not write request.", e)); + } finally { + if (!success) { + channel.close(); + } } + return future; }, executor)
