[FLINK-4451] [rpc] Throw RpcConnectionException when rpc endpoint is not reachable
This PR introduces a RpcConnectionException which is thrown if the rpc endpoint is not reachable when calling RpcService.connect. This closes #2405. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3be561f5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3be561f5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3be561f5 Branch: refs/heads/flip-6 Commit: 3be561f57dab448536e41636997506f5f12aea18 Parents: 6f9936b Author: Till Rohrmann <trohrm...@apache.org> Authored: Tue Aug 23 17:59:54 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:40 2016 +0200 ---------------------------------------------------------------------- .../registration/RetryingRegistration.java | 2 +- .../apache/flink/runtime/rpc/RpcService.java | 7 +++- .../flink/runtime/rpc/akka/AkkaRpcService.java | 38 +++++++++++------- .../rpc/exceptions/RpcConnectionException.java | 41 ++++++++++++++++++++ .../runtime/rpc/akka/AkkaRpcActorTest.java | 18 +++++++++ 5 files changed, 88 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index 88fe9b5..ea49e42 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -197,7 +197,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e @Override public void onFailure(Throwable failure) { if (!isCanceled()) { - log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress); + log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure); startRegistration(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index bc0f5cb..78c1cec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.rpc; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import scala.concurrent.ExecutionContext; import scala.concurrent.Future; @@ -32,12 +33,14 @@ public interface RpcService { /** * Connect to a remote rpc server under the provided address. Returns a rpc gateway which can - * be used to communicate with the rpc server. + * be used to communicate with the rpc server. If the connection failed, then the returned + * future is failed with a {@link RpcConnectionException}. * * @param address Address of the remote rpc server * @param clazz Class of the rpc gateway to return * @param <C> Type of the rpc gateway to return - * @return Future containing the rpc gateway + * @return Future containing the rpc gateway or an {@link RpcConnectionException} if the + * connection attempt failed */ <C extends RpcGateway> Future<C> connect(String address, Class<C> clazz); http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 00a6932..060a1ef 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.StartStoppable; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -99,25 +100,32 @@ public class AkkaRpcService implements RpcService { final Future<Object> identify = asker.ask(new Identify(42), timeout); return identify.map(new Mapper<Object, C>(){ @Override - public C apply(Object obj) { - ActorRef actorRef = ((ActorIdentity) obj).getRef(); + public C checkedApply(Object obj) throws Exception { - final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); + ActorIdentity actorIdentity = (ActorIdentity) obj; - InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); + if (actorIdentity.getRef() == null) { + throw new RpcConnectionException("Could not connect to rpc endpoint under address " + address + '.'); + } else { + ActorRef actorRef = actorIdentity.getRef(); + + final String address = AkkaUtils.getAkkaURL(actorSystem, actorRef); + + InvocationHandler akkaInvocationHandler = new AkkaInvocationHandler(address, actorRef, timeout, maximumFramesize); - // Rather than using the System ClassLoader directly, we derive the ClassLoader - // from this class . That works better in cases where Flink runs embedded and all Flink - // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader - ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader(); - - @SuppressWarnings("unchecked") - C proxy = (C) Proxy.newProxyInstance( - classLoader, - new Class<?>[] {clazz}, - akkaInvocationHandler); + // Rather than using the System ClassLoader directly, we derive the ClassLoader + // from this class . That works better in cases where Flink runs embedded and all Flink + // code is loaded dynamically (for example from an OSGI bundle) through a custom ClassLoader + ClassLoader classLoader = AkkaRpcService.this.getClass().getClassLoader(); - return proxy; + @SuppressWarnings("unchecked") + C proxy = (C) Proxy.newProxyInstance( + classLoader, + new Class<?>[]{clazz}, + akkaInvocationHandler); + + return proxy; + } } }, actorSystem.dispatcher()); } http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java new file mode 100644 index 0000000..a22ebe7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/exceptions/RpcConnectionException.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc.exceptions; + +import java.util.concurrent.ExecutionException; + +/** + * Exception class which is thrown if a rpc connection failed. Usually this happens if the remote + * host cannot be reached. + */ +public class RpcConnectionException extends ExecutionException { + private static final long serialVersionUID = -5500560405481142472L; + + public RpcConnectionException(String message) { + super(message); + } + + public RpcConnectionException(String message, Throwable cause) { + super(message, cause); + } + + public RpcConnectionException(Throwable cause) { + super(cause); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/3be561f5/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 82d13f0..a6ceb91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcMethod; import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.apache.flink.util.TestLogger; import org.hamcrest.core.Is; import org.junit.AfterClass; @@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.fail; public class AkkaRpcActorTest extends TestLogger { @@ -73,6 +75,22 @@ public class AkkaRpcActorTest extends TestLogger { } /** + * Tests that a {@link RpcConnectionException} is thrown if the rpc endpoint cannot be connected to. + */ + @Test + public void testFailingAddressResolution() throws Exception { + Future<DummyRpcGateway> futureRpcGateway = akkaRpcService.connect("foobar", DummyRpcGateway.class); + + try { + DummyRpcGateway gateway = Await.result(futureRpcGateway, timeout.duration()); + + fail("The rpc connection resolution should have failed."); + } catch (RpcConnectionException exception) { + // we're expecting a RpcConnectionException + } + } + + /** * Tests that the {@link AkkaRpcActor} stashes messages until the corresponding * {@link RpcEndpoint} has been started. */