[FLINK-4580] [rpc] Report rpc invocation exceptions to the caller

This closes #2526.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/59d9e67b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/59d9e67b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/59d9e67b

Branch: refs/heads/flip-6
Commit: 59d9e67b5811da2a864f7784a685e3c829f4f039
Parents: 360eaf8
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 21 15:18:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:41 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    | 53 ++++++++++++--------
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 34 +++++++++++++
 2 files changed, 66 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/59d9e67b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index 59daa46..1b456a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,11 +87,11 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                        unstashAll();
                        getContext().become(new Procedure<Object>() {
                                @Override
-                               public void apply(Object message) throws 
Exception {
-                                       if (message.equals(Processing.STOP)) {
+                               public void apply(Object msg) throws Exception {
+                                       if (msg.equals(Processing.STOP)) {
                                                getContext().unbecome();
                                        } else {
-                                               handleMessage(message);
+                                               handleMessage(msg);
                                        }
                                }
                        });
@@ -130,21 +131,36 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
         * @param rpcInvocation Rpc invocation message
         */
        private void handleRpcInvocation(RpcInvocation rpcInvocation) {
+               Method rpcMethod = null;
+
                try {
                        String methodName = rpcInvocation.getMethodName();
                        Class<?>[] parameterTypes = 
rpcInvocation.getParameterTypes();
 
-                       Method rpcMethod = lookupRpcMethod(methodName, 
parameterTypes);
+                       rpcMethod = lookupRpcMethod(methodName, parameterTypes);
+               } catch(ClassNotFoundException e) {
+                       LOG.error("Could not load method arguments.", e);
+
+                       RpcConnectionException rpcException = new 
RpcConnectionException("Could not load method arguments.", e);
+                       getSender().tell(new Status.Failure(rpcException), 
getSelf());
+               } catch (IOException e) {
+                       LOG.error("Could not deserialize rpc invocation 
message.", e);
+
+                       RpcConnectionException rpcException = new 
RpcConnectionException("Could not deserialize rpc invocation message.", e);
+                       getSender().tell(new Status.Failure(rpcException), 
getSelf());
+               } catch (final NoSuchMethodException e) {
+                       LOG.error("Could not find rpc method for rpc 
invocation.", e);
+
+                       RpcConnectionException rpcException = new 
RpcConnectionException("Could not find rpc method for rpc invocation.", e);
+                       getSender().tell(new Status.Failure(rpcException), 
getSelf());
+               }
 
-                       if (rpcMethod.getReturnType().equals(Void.TYPE)) {
-                               // No return value to send back
-                               try {
+               if (rpcMethod != null) {
+                       try {
+                               if 
(rpcMethod.getReturnType().equals(Void.TYPE)) {
+                                       // No return value to send back
                                        rpcMethod.invoke(rpcEndpoint, 
rpcInvocation.getArgs());
-                               } catch (Throwable e) {
-                                       LOG.error("Error while executing remote 
procedure call {}.", rpcMethod, e);
-                               }
-                       } else {
-                               try {
+                               } else {
                                        Object result = 
rpcMethod.invoke(rpcEndpoint, rpcInvocation.getArgs());
 
                                        if (result instanceof Future) {
@@ -169,17 +185,12 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                                                // tell the sender the result 
of the computation
                                                getSender().tell(new 
Status.Success(result), getSelf());
                                        }
-                               } catch (Throwable e) {
-                                       // tell the sender about the failure
-                                       getSender().tell(new Status.Failure(e), 
getSelf());
                                }
+                       } catch (Throwable e) {
+                               LOG.error("Error while executing remote 
procedure call {}.", rpcMethod, e);
+                               // tell the sender about the failure
+                               getSender().tell(new Status.Failure(e), 
getSelf());
                        }
-               } catch(ClassNotFoundException e) {
-                       LOG.error("Could not load method arguments.", e);
-               } catch (IOException e) {
-                       LOG.error("Could not deserialize rpc invocation 
message.", e);
-               } catch (final NoSuchMethodException e) {
-                       LOG.error("Could not find rpc method for rpc 
invocation: {}.", rpcInvocation, e);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/59d9e67b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
index 5624d12..1e8c9a6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -119,10 +119,44 @@ public class AkkaRpcActorTest extends TestLogger {
                rpcEndpoint.shutDown();
        }
 
+       /**
+        * Tests that we receive a RpcConnectionException when calling a rpc 
method (with return type)
+        * on a wrong rpc endpoint.
+        *
+        * @throws Exception
+        */
+       @Test
+       public void testWrongGatewayEndpointConnection() throws Exception {
+               DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
+
+               rpcEndpoint.start();
+
+               Future<WrongRpcGateway> futureGateway = 
akkaRpcService.connect(rpcEndpoint.getAddress(), WrongRpcGateway.class);
+
+               WrongRpcGateway gateway = Await.result(futureGateway, 
timeout.duration());
+
+               // since it is a tell operation we won't receive a 
RpcConnectionException, it's only logged
+               gateway.tell("foobar");
+
+               Future<Boolean> result = gateway.barfoo();
+
+               try {
+                       Await.result(result, timeout.duration());
+                       fail("We expected a RpcConnectionException.");
+               } catch (RpcConnectionException rpcConnectionException) {
+                       // we expect this exception here
+               }
+       }
+
        private interface DummyRpcGateway extends RpcGateway {
                Future<Integer> foobar();
        }
 
+       private interface WrongRpcGateway extends RpcGateway {
+               Future<Boolean> barfoo();
+               void tell(String message);
+       }
+
        private static class DummyRpcEndpoint extends 
RpcEndpoint<DummyRpcGateway> {
 
                private volatile int _foobar = 42;

Reply via email to