[FLINK-4451] [rpc] Throw RpcConnectionException when rpc endpoint is not 
reachable

This PR introduces a RpcConnectionException which is thrown if the rpc endpoint
is not reachable when calling RpcService.connect.

This closes #2405.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be561f5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be561f5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be561f5

Branch: refs/heads/flip-6
Commit: 3be561f57dab448536e41636997506f5f12aea18
Parents: 6f9936b
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Tue Aug 23 17:59:54 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200

----------------------------------------------------------------------
 .../registration/RetryingRegistration.java      |  2 +-
 .../apache/flink/runtime/rpc/RpcService.java    |  7 +++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 38 +++++++++++-------
 .../rpc/exceptions/RpcConnectionException.java  | 41 ++++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 18 +++++++++
 5 files changed, 88 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index 88fe9b5..ea49e42 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -197,7 +197,7 @@ public abstract class RetryingRegistration<Gateway extends 
RpcGateway, Success e
                                @Override
                                public void onFailure(Throwable failure) {
                                        if (!isCanceled()) {
-                                               log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress);
+                                               log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress, failure);
                                                startRegistration();
                                        }
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index bc0f5cb..78c1cec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import scala.concurrent.ExecutionContext;
 import scala.concurrent.Future;
 
@@ -32,12 +33,14 @@ public interface RpcService {
 
        /**
         * Connect to a remote rpc server under the provided address. Returns a 
rpc gateway which can
-        * be used to communicate with the rpc server.
+        * be used to communicate with the rpc server. If the connection 
failed, then the returned
+        * future is failed with a {@link RpcConnectionException}.
         *
         * @param address Address of the remote rpc server
         * @param clazz Class of the rpc gateway to return
         * @param <C> Type of the rpc gateway to return
-        * @return Future containing the rpc gateway
+        * @return Future containing the rpc gateway or an {@link 
RpcConnectionException} if the
+        * connection attempt failed
         */
        <C extends RpcGateway> Future<C> connect(String address, Class<C> 
clazz);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 00a6932..060a1ef 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.StartStoppable;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -99,25 +100,32 @@ public class AkkaRpcService implements RpcService {
                final Future<Object> identify = asker.ask(new Identify(42), 
timeout);
                return identify.map(new Mapper<Object, C>(){
                        @Override
-                       public C apply(Object obj) {
-                               ActorRef actorRef = ((ActorIdentity) 
obj).getRef();
+                       public C checkedApply(Object obj) throws Exception {
 
-                               final String address = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
+                               ActorIdentity actorIdentity = (ActorIdentity) 
obj;
 
-                               InvocationHandler akkaInvocationHandler = new 
AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
+                               if (actorIdentity.getRef() == null) {
+                                       throw new RpcConnectionException("Could 
not connect to rpc endpoint under address " + address + '.');
+                               } else {
+                                       ActorRef actorRef = 
actorIdentity.getRef();
+
+                                       final String address = 
AkkaUtils.getAkkaURL(actorSystem, actorRef);
+
+                                       InvocationHandler akkaInvocationHandler 
= new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize);
 
-                               // Rather than using the System ClassLoader 
directly, we derive the ClassLoader
-                               // from this class . That works better in cases 
where Flink runs embedded and all Flink
-                               // code is loaded dynamically (for example from 
an OSGI bundle) through a custom ClassLoader
-                               ClassLoader classLoader = 
AkkaRpcService.this.getClass().getClassLoader();
-                               
-                               @SuppressWarnings("unchecked")
-                               C proxy = (C) Proxy.newProxyInstance(
-                                       classLoader,
-                                       new Class<?>[] {clazz},
-                                       akkaInvocationHandler);
+                                       // Rather than using the System 
ClassLoader directly, we derive the ClassLoader
+                                       // from this class . That works better 
in cases where Flink runs embedded and all Flink
+                                       // code is loaded dynamically (for 
example from an OSGI bundle) through a custom ClassLoader
+                                       ClassLoader classLoader = 
AkkaRpcService.this.getClass().getClassLoader();
 
-                               return proxy;
+                                       @SuppressWarnings("unchecked")
+                                       C proxy = (C) Proxy.newProxyInstance(
+                                               classLoader,
+                                               new Class<?>[]{clazz},
+                                               akkaInvocationHandler);
+
+                                       return proxy;
+                               }
                        }
                }, actorSystem.dispatcher());
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
new file mode 100644
index 0000000..a22ebe7
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.exceptions;
+
+import java.util.concurrent.ExecutionException;
+
+/**
+ * Exception class which is thrown if a rpc connection failed. Usually this 
happens if the remote
+ * host cannot be reached.
+ */
+public class RpcConnectionException extends ExecutionException {
+       private static final long serialVersionUID = -5500560405481142472L;
+
+       public RpcConnectionException(String message) {
+               super(message);
+       }
+
+       public RpcConnectionException(String message, Throwable cause) {
+               super(message, cause);
+       }
+
+       public RpcConnectionException(Throwable cause) {
+               super(cause);
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 82d13f0..a6ceb91 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.apache.flink.util.TestLogger;
 import org.hamcrest.core.Is;
 import org.junit.AfterClass;
@@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
 
 public class AkkaRpcActorTest extends TestLogger {
 
@@ -73,6 +75,22 @@ public class AkkaRpcActorTest extends TestLogger {
        }
 
        /**
+        * Tests that a {@link RpcConnectionException} is thrown if the rpc 
endpoint cannot be connected to.
+        */
+       @Test
+       public void testFailingAddressResolution() throws Exception {
+               Future<DummyRpcGateway> futureRpcGateway = 
akkaRpcService.connect("foobar", DummyRpcGateway.class);
+
+               try {
+                       DummyRpcGateway gateway = 
Await.result(futureRpcGateway, timeout.duration());
+
+                       fail("The rpc connection resolution should have 
failed.");
+               } catch (RpcConnectionException exception) {
+                       // we're expecting a RpcConnectionException
+               }
+       }
+
+       /**
         * Tests that the {@link AkkaRpcActor} stashes messages until the 
corresponding
         * {@link RpcEndpoint} has been started.
         */

Reply via email to