[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller This closes #2526.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59d9e67b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59d9e67b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59d9e67b Branch: refs/heads/flip-6 Commit: 59d9e67b5811da2a864f7784a685e3c829f4f039 Parents: 360eaf8 Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Sep 21 15:18:27 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:41 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/akka/AkkaRpcActor.java | 53 ++++++++++++-------- .../runtime/rpc/akka/AkkaRpcActorTest.java | 34 +++++++++++++ 2 files changed, 66 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/59d9e67b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java index 59daa46..1b456a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.akka.messages.Processing; import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation; import org.apache.flink.runtime.rpc.akka.messages.RunAsync; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,11 +87,11 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp unstashAll(); getContext().become(new Procedure<Object>() { @Override - public void apply(Object message) throws Exception { - if (message.equals(Processing.STOP)) { + public void apply(Object msg) throws Exception { + if (msg.equals(Processing.STOP)) { getContext().unbecome(); } else { - handleMessage(message); + handleMessage(msg); } } }); @@ -130,21 +131,36 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp * @param rpcInvocation Rpc invocation message */ private void handleRpcInvocation(RpcInvocation rpcInvocation) { + Method rpcMethod = null; + try { String methodName = rpcInvocation.getMethodName(); Class<?>[] parameterTypes = rpcInvocation.getParameterTypes(); - Method rpcMethod = lookupRpcMethod(methodName, parameterTypes); + rpcMethod = lookupRpcMethod(methodName, parameterTypes); + } catch(ClassNotFoundException e) { + LOG.error("Could not load method arguments.", e); + + RpcConnectionException rpcException = new RpcConnectionException("Could not load method arguments.", e); + getSender().tell(new Status.Failure(rpcException), getSelf()); + } catch (IOException e) { + LOG.error("Could not deserialize rpc invocation message.", e); + + RpcConnectionException rpcException = new RpcConnectionException("Could not deserialize rpc invocation message.", e); + getSender().tell(new Status.Failure(rpcException), getSelf()); + } catch (final NoSuchMethodException e) { + LOG.error("Could not find rpc method for rpc invocation.", e); + + RpcConnectionException rpcException = new RpcConnectionException("Could not find rpc method for rpc invocation.", e); + getSender().tell(new Status.Failure(rpcException), getSelf()); + } - if (rpcMethod.getReturnType().equals(Void.TYPE)) { - // No return value to send back - try { + if (rpcMethod != null) { + try { + if (rpcMethod.getReturnType().equals(Void.TYPE)) { + // No return value to send back rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); - } catch (Throwable e) { - LOG.error("Error while executing remote procedure call {}.", rpcMethod, e); - } - } else { - try { + } else { Object result = rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs()); if (result instanceof Future) { @@ -169,17 +185,12 @@ class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends Untyp // tell the sender the result of the computation getSender().tell(new Status.Success(result), getSelf()); } - } catch (Throwable e) { - // tell the sender about the failure - getSender().tell(new Status.Failure(e), getSelf()); } + } catch (Throwable e) { + LOG.error("Error while executing remote procedure call {}.", rpcMethod, e); + // tell the sender about the failure + getSender().tell(new Status.Failure(e), getSelf()); } - } catch(ClassNotFoundException e) { - LOG.error("Could not load method arguments.", e); - } catch (IOException e) { - LOG.error("Could not deserialize rpc invocation message.", e); - } catch (final NoSuchMethodException e) { - LOG.error("Could not find rpc method for rpc invocation: {}.", rpcInvocation, e); } } http://git-wip-us.apache.org/repos/asf/flink/blob/59d9e67b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java index 5624d12..1e8c9a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java @@ -119,10 +119,44 @@ public class AkkaRpcActorTest extends TestLogger { rpcEndpoint.shutDown(); } + /** + * Tests that we receive a RpcConnectionException when calling a rpc method (with return type) + * on a wrong rpc endpoint. + * + * @throws Exception + */ + @Test + public void testWrongGatewayEndpointConnection() throws Exception { + DummyRpcEndpoint rpcEndpoint = new DummyRpcEndpoint(akkaRpcService); + + rpcEndpoint.start(); + + Future<WrongRpcGateway> futureGateway = akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class); + + WrongRpcGateway gateway = Await.result(futureGateway, timeout.duration()); + + // since it is a tell operation we won't receive a RpcConnectionException, it's only logged + gateway.tell("foobar"); + + Future<Boolean> result = gateway.barfoo(); + + try { + Await.result(result, timeout.duration()); + fail("We expected a RpcConnectionException."); + } catch (RpcConnectionException rpcConnectionException) { + // we expect this exception here + } + } + private interface DummyRpcGateway extends RpcGateway { Future<Integer> foobar(); } + private interface WrongRpcGateway extends RpcGateway { + Future<Boolean> barfoo(); + void tell(String message); + } + private static class DummyRpcEndpoint extends RpcEndpoint<DummyRpcGateway> { private volatile int _foobar = 42;