[FLINK-4382] [rpc] Buffer rpc calls until the RpcEndpoint has been started

This PR allows the AkkaRpcActor to stash messages until the corresponding 
RcpEndpoint
has been started. When receiving a Processing.START message, the AkkaRpcActor
unstashes all messages and starts processing rpcs. When receiving a 
Processing.STOP
message, it will stop processing messages and stash incoming messages again.

Add test case for message stashing

This closes #2358.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afabd789
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afabd789
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afabd789

Branch: refs/heads/flip-6
Commit: afabd78987e31d882de929c80438f28196230368
Parents: d17afc1
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Thu Aug 11 18:13:25 2016 +0200
Committer: Till Rohrmann <trohrm...@apache.org>
Committed: Tue Sep 27 19:24:56 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   |  15 ++-
 .../flink/runtime/rpc/StartStoppable.java       |  35 ++++++
 .../runtime/rpc/akka/AkkaInvocationHandler.java |  21 +++-
 .../flink/runtime/rpc/akka/AkkaRpcActor.java    |  39 ++++++-
 .../flink/runtime/rpc/akka/AkkaRpcService.java  |   8 +-
 .../runtime/rpc/akka/messages/Processing.java   |  27 +++++
 .../flink/runtime/rpc/RpcCompletenessTest.java  |  45 +++++++-
 .../runtime/rpc/akka/AkkaRpcActorTest.java      | 108 +++++++++++++++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    |   3 +
 .../flink/runtime/rpc/akka/AsyncCallsTest.java  |   5 +-
 .../rpc/akka/MainThreadValidationTest.java      |   4 +-
 .../rpc/akka/MessageSerializationTest.java      |   4 +
 .../rpc/taskexecutor/TaskExecutorTest.java      |  18 ++++
 13 files changed, 315 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index d36a283..67ac182 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -74,7 +74,7 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 
        /** The main thread execution context to be used to execute future 
callbacks in the main thread
         * of the executing rpc server. */
-       private final MainThreadExecutionContext mainThreadExecutionContext;
+       private final ExecutionContext mainThreadExecutionContext;
 
        /** A reference to the endpoint's main thread, if the current method is 
called by the main thread */
        final AtomicReference<Thread> currentMainThread = new 
AtomicReference<>(null); 
@@ -106,10 +106,21 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
        }
        
        // 
------------------------------------------------------------------------
-       //  Shutdown
+       //  Start & Shutdown
        // 
------------------------------------------------------------------------
 
        /**
+        * Starts the rpc endpoint. This tells the underlying rpc server that 
the rpc endpoint is ready
+        * to process remote procedure calls.
+        *
+        * IMPORTANT: Whenever you override this method, call the parent 
implementation to enable
+        * rpc processing. It is advised to make the parent call last.
+        */
+       public void start() {
+               ((StartStoppable) self).start();
+       }
+
+       /**
         * Shuts down the underlying RPC endpoint via the RPC service.
         * After this method was called, the RPC endpoint will no longer be 
reachable, neither remotely,
         * not via its {@link #getSelf() self gateway}. It will also not 
accepts executions in main thread

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
new file mode 100644
index 0000000..dd5595f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/StartStoppable.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+/**
+ * Interface to start and stop the processing of rpc calls in the rpc server.
+ */
+public interface StartStoppable {
+
+       /**
+        * Starts the processing of remote procedure calls.
+        */
+       void start();
+
+       /**
+        * Stops the processing of remote procedure calls.
+        */
+       void stop();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
index 297104b..524bf74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaInvocationHandler.java
@@ -24,8 +24,10 @@ import akka.util.Timeout;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcTimeout;
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RemoteRpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
@@ -50,7 +52,7 @@ import static 
org.apache.flink.util.Preconditions.checkArgument;
  * rpc in a {@link LocalRpcInvocation} message and then sends it to the {@link 
AkkaRpcActor} where it is
  * executed.
  */
-class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor {
+class AkkaInvocationHandler implements InvocationHandler, AkkaGateway, 
MainThreadExecutor, StartStoppable {
        private static final Logger LOG = 
Logger.getLogger(AkkaInvocationHandler.class);
 
        private final ActorRef rpcEndpoint;
@@ -76,7 +78,8 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
 
                Object result;
 
-               if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutor.class) || 
declaringClass.equals(Object.class)) {
+               if (declaringClass.equals(AkkaGateway.class) || 
declaringClass.equals(MainThreadExecutor.class) ||
+                       declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class)) {
                        result = method.invoke(this, args);
                } else {
                        String methodName = method.getName();
@@ -171,6 +174,20 @@ class AkkaInvocationHandler implements InvocationHandler, 
AkkaGateway, MainThrea
                }
        }
 
+       @Override
+       public void start() {
+               rpcEndpoint.tell(Processing.START, ActorRef.noSender());
+       }
+
+       @Override
+       public void stop() {
+               rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Helper methods
+       // 
------------------------------------------------------------------------
+
        /**
         * Extracts the {@link RpcTimeout} annotated rpc timeout value from the 
list of given method
         * arguments. If no {@link RpcTimeout} annotated parameter could be 
found, then the default

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
index dfcbcc3..2373be9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActor.java
@@ -20,13 +20,15 @@ package org.apache.flink.runtime.rpc.akka;
 
 import akka.actor.ActorRef;
 import akka.actor.Status;
-import akka.actor.UntypedActor;
+import akka.actor.UntypedActorWithStash;
+import akka.japi.Procedure;
 import akka.pattern.Patterns;
 import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.akka.messages.CallAsync;
 import org.apache.flink.runtime.rpc.akka.messages.LocalRpcInvocation;
+import org.apache.flink.runtime.rpc.akka.messages.Processing;
 import org.apache.flink.runtime.rpc.akka.messages.RpcInvocation;
 import org.apache.flink.runtime.rpc.akka.messages.RunAsync;
 
@@ -45,18 +47,23 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} 
and {@link CallAsync}
- * messages.
+ * {@link Processing} messages.
  * <p>
  * The {@link LocalRpcInvocation} designates a rpc and is dispatched to the 
given {@link RpcEndpoint}
  * instance.
  * <p>
  * The {@link RunAsync} and {@link CallAsync} messages contain executable code 
which is executed
  * in the context of the actor thread.
+ * <p>
+ * The {@link Processing} message controls the processing behaviour of the 
akka rpc actor. A
+ * {@link Processing#START} message unstashes all stashed messages and starts 
processing incoming
+ * messages. A {@link Processing#STOP} message stops processing messages and 
stashes incoming
+ * messages.
  *
  * @param <C> Type of the {@link RpcGateway} associated with the {@link 
RpcEndpoint}
  * @param <T> Type of the {@link RpcEndpoint}
  */
-class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends 
UntypedActor {
+class AkkaRpcActor<C extends RpcGateway, T extends RpcEndpoint<C>> extends 
UntypedActorWithStash {
        
        private static final Logger LOG = 
LoggerFactory.getLogger(AkkaRpcActor.class);
 
@@ -73,6 +80,27 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
 
        @Override
        public void onReceive(final Object message) {
+               if (message.equals(Processing.START)) {
+                       unstashAll();
+                       getContext().become(new Procedure<Object>() {
+                               @Override
+                               public void apply(Object message) throws 
Exception {
+                                       if (message.equals(Processing.STOP)) {
+                                               getContext().unbecome();
+                                       } else {
+                                               handleMessage(message);
+                                       }
+                               }
+                       });
+               } else {
+                       LOG.info("The rpc endpoint {} has not been started yet. 
Stashing message {} until processing is started.",
+                               rpcEndpoint.getClass().getName(),
+                               message.getClass().getName());
+                       stash();
+               }
+       }
+
+       private void handleMessage(Object message) {
                mainThreadValidator.enterMainThread();
                try {
                        if (message instanceof RunAsync) {
@@ -82,7 +110,10 @@ class AkkaRpcActor<C extends RpcGateway, T extends 
RpcEndpoint<C>> extends Untyp
                        } else if (message instanceof RpcInvocation) {
                                handleRpcInvocation((RpcInvocation) message);
                        } else {
-                               LOG.warn("Received message of unknown type {}. 
Dropping this message!", message.getClass());
+                               LOG.warn(
+                                       "Received message of unknown type {} 
with value {}. Dropping this message!",
+                                       message.getClass().getName(),
+                                       message);
                        }
                } finally {
                        mainThreadValidator.exitMainThread();

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index b963c53..7b33524 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.rpc.MainThreadExecutor;
 import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
-
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -136,7 +136,11 @@ public class AkkaRpcService implements RpcService {
                @SuppressWarnings("unchecked")
                C self = (C) Proxy.newProxyInstance(
                        classLoader,
-                       new Class<?>[]{rpcEndpoint.getSelfGatewayType(), 
MainThreadExecutor.class, AkkaGateway.class},
+                       new Class<?>[]{
+                               rpcEndpoint.getSelfGatewayType(),
+                               MainThreadExecutor.class,
+                               StartStoppable.class,
+                               AkkaGateway.class},
                        akkaInvocationHandler);
 
                return self;

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
new file mode 100644
index 0000000..5c7df5d
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/messages/Processing.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka.messages;
+
+/**
+ * Controls the processing behaviour of the {@link 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor}
+ */
+public enum Processing {
+       START, // Unstashes all stashed messages and starts processing incoming 
messages
+       STOP // Stop processing messages and stashes all incoming messages
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index e50533e..97cf0cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.runtime.rpc;
 
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
@@ -140,7 +142,7 @@ public class RpcCompletenessTest extends TestLogger {
                int rpcTimeoutParameters = 0;
 
                for (int i = 0; i < parameterAnnotations.length; i++) {
-                       if (isRpcTimeout(parameterAnnotations[i])) {
+                       if 
(RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
                                assertTrue(
                                        "The rpc timeout has to be of type " + 
FiniteDuration.class.getName() + ".",
                                        
parameterTypes[i].equals(FiniteDuration.class));
@@ -185,7 +187,7 @@ public class RpcCompletenessTest extends TestLogger {
 
                // filter out the RpcTimeout parameters
                for (int i = 0; i < gatewayParameterTypes.length; i++) {
-                       if (!isRpcTimeout(gatewayParameterAnnotations[i])) {
+                       if 
(!RpcCompletenessTest.isRpcTimeout(gatewayParameterAnnotations[i])) {
                                
filteredGatewayParameterTypes.add(gatewayParameterTypes[i]);
                        }
                }
@@ -235,7 +237,22 @@ public class RpcCompletenessTest extends TestLogger {
        }
 
        private boolean checkType(Class<?> firstType, Class<?> secondType) {
-               return firstType.equals(secondType);
+               Class<?> firstResolvedType;
+               Class<?> secondResolvedType;
+
+               if (firstType.isPrimitive()) {
+                       firstResolvedType = 
RpcCompletenessTest.resolvePrimitiveType(firstType);
+               } else {
+                       firstResolvedType = firstType;
+               }
+
+               if (secondType.isPrimitive()) {
+                       secondResolvedType = 
RpcCompletenessTest.resolvePrimitiveType(secondType);
+               } else {
+                       secondResolvedType = secondType;
+               }
+
+               return firstResolvedType.equals(secondResolvedType);
        }
 
        /**
@@ -279,7 +296,7 @@ public class RpcCompletenessTest extends TestLogger {
 
                for (int i = 0; i < parameterTypes.length; i++) {
                        // filter out the RpcTimeout parameters
-                       if (!isRpcTimeout(parameterAnnotations[i])) {
+                       if 
(!RpcCompletenessTest.isRpcTimeout(parameterAnnotations[i])) {
                                builder.append(parameterTypes[i].getName());
 
                                if (i < parameterTypes.length -1) {
@@ -293,7 +310,7 @@ public class RpcCompletenessTest extends TestLogger {
                return builder.toString();
        }
 
-       private boolean isRpcTimeout(Annotation[] annotations) {
+       private static boolean isRpcTimeout(Annotation[] annotations) {
                for (Annotation annotation : annotations) {
                        if 
(annotation.annotationType().equals(RpcTimeout.class)) {
                                return true;
@@ -302,4 +319,22 @@ public class RpcCompletenessTest extends TestLogger {
 
                return false;
        }
+
+       /**
+        * Returns the boxed type for a primitive type.
+        *
+        * @param primitveType Primitive type to resolve
+        * @return Boxed type for the given primitive type
+        */
+       private static Class<?> resolvePrimitiveType(Class<?> primitveType) {
+               assert primitveType.isPrimitive();
+
+               TypeInformation<?> typeInformation = 
BasicTypeInfo.getInfoFor(primitveType);
+
+               if (typeInformation != null) {
+                       return typeInformation.getTypeClass();
+               } else {
+                       throw new RuntimeException("Could not retrive basic 
type information for primitive type " + primitveType + '.');
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
new file mode 100644
index 0000000..1653fac
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcActorTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.akka;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcMethod;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.TestLogger;
+import org.hamcrest.core.Is;
+import org.junit.AfterClass;
+import org.junit.Test;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertThat;
+
+public class AkkaRpcActorTest extends TestLogger {
+
+       // 
------------------------------------------------------------------------
+       //  shared test members
+       // 
------------------------------------------------------------------------
+
+       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+
+       private static Timeout timeout = new Timeout(10000, 
TimeUnit.MILLISECONDS);
+
+       private static AkkaRpcService akkaRpcService =
+               new AkkaRpcService(actorSystem, timeout);
+
+       @AfterClass
+       public static void shutdown() {
+               akkaRpcService.stopService();
+               actorSystem.shutdown();
+               actorSystem.awaitTermination();
+       }
+
+       /**
+        * Tests that the {@link AkkaRpcActor} stashes messages until the 
corresponding
+        * {@link RpcEndpoint} has been started.
+        */
+       @Test
+       public void testMessageStashing() throws Exception {
+               int expectedValue = 1337;
+
+               DummyRpcEndpoint rpcEndpoint = new 
DummyRpcEndpoint(akkaRpcService);
+
+               DummyRpcGateway rpcGateway = rpcEndpoint.getSelf();
+
+               // this message should not be processed until we've started the 
rpc endpoint
+               Future<Integer> result = rpcGateway.foobar();
+
+               // set a new value which we expect to be returned
+               rpcEndpoint.setFoobar(expectedValue);
+
+               // now process the rpc
+               rpcEndpoint.start();
+
+               Integer actualValue = Await.result(result, timeout.duration());
+
+               assertThat("The new foobar value should have been returned.", 
actualValue, Is.is(expectedValue));
+
+               rpcEndpoint.shutDown();
+       }
+
+       private interface DummyRpcGateway extends RpcGateway {
+               Future<Integer> foobar();
+       }
+
+       private static class DummyRpcEndpoint extends 
RpcEndpoint<DummyRpcGateway> {
+
+               private volatile int _foobar = 42;
+
+               protected DummyRpcEndpoint(RpcService rpcService) {
+                       super(rpcService);
+               }
+
+               @RpcMethod
+               public int foobar() {
+                       return _foobar;
+               }
+
+               public void setFoobar(int value) {
+                       _foobar = value;
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index f26b40b..fd55904 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -57,6 +57,9 @@ public class AkkaRpcServiceTest extends TestLogger {
                ResourceManager resourceManager = new 
ResourceManager(akkaRpcService, executorService);
                JobMaster jobMaster = new JobMaster(akkaRpcService2, 
executorService);
 
+               resourceManager.start();
+               jobMaster.start();
+
                ResourceManagerGateway rm = resourceManager.getSelf();
 
                assertTrue(rm instanceof AkkaGateway);

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
index f2ce52d..d33987c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AsyncCallsTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.AfterClass;
 import org.junit.Test;
 
@@ -42,7 +43,7 @@ import java.util.concurrent.locks.ReentrantLock;
 
 import static org.junit.Assert.*;
 
-public class AsyncCallsTest {
+public class AsyncCallsTest extends TestLogger {
 
        // 
------------------------------------------------------------------------
        //  shared test members
@@ -72,6 +73,7 @@ public class AsyncCallsTest {
                final AtomicBoolean concurrentAccess = new AtomicBoolean(false);
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
+               testEndpoint.start();
                TestGateway gateway = testEndpoint.getSelf();
 
                // a bunch of gateway calls
@@ -127,6 +129,7 @@ public class AsyncCallsTest {
                final long delay = 200;
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService, 
lock);
+               testEndpoint.start();
 
                // run something asynchronously
                testEndpoint.runAsync(new Runnable() {

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
index b854143..9ffafda 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MainThreadValidationTest.java
@@ -27,13 +27,14 @@ import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcService;
 
+import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertTrue;
 
-public class MainThreadValidationTest {
+public class MainThreadValidationTest extends TestLogger {
 
        @Test
        public void failIfNotInMainThread() {
@@ -51,6 +52,7 @@ public class MainThreadValidationTest {
 
                try {
                        TestEndpoint testEndpoint = new 
TestEndpoint(akkaRpcService);
+                       testEndpoint.start();
 
                        // this works, because it is executed as an RPC call
                        
testEndpoint.getSelf().someConcurrencyCriticalFunction();

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
index ca8179c..9d2ed99 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/MessageSerializationTest.java
@@ -86,6 +86,7 @@ public class MessageSerializationTest extends TestLogger {
        public void testNonSerializableLocalMessageTransfer() throws 
InterruptedException, IOException {
                LinkedBlockingQueue<Object> linkedBlockingQueue = new 
LinkedBlockingQueue<>();
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, 
linkedBlockingQueue);
+               testEndpoint.start();
 
                TestGateway testGateway = testEndpoint.getSelf();
 
@@ -106,6 +107,7 @@ public class MessageSerializationTest extends TestLogger {
                LinkedBlockingQueue<Object> linkedBlockingQueue = new 
LinkedBlockingQueue<>();
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, 
linkedBlockingQueue);
+               testEndpoint.start();
 
                String address = testEndpoint.getAddress();
 
@@ -126,6 +128,7 @@ public class MessageSerializationTest extends TestLogger {
                LinkedBlockingQueue<Object> linkedBlockingQueue = new 
LinkedBlockingQueue<>();
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, 
linkedBlockingQueue);
+               testEndpoint.start();
 
                String address = testEndpoint.getAddress();
 
@@ -149,6 +152,7 @@ public class MessageSerializationTest extends TestLogger {
                LinkedBlockingQueue<Object> linkedBlockingQueue = new 
LinkedBlockingQueue<>();
 
                TestEndpoint testEndpoint = new TestEndpoint(akkaRpcService1, 
linkedBlockingQueue);
+               testEndpoint.start();
 
                String address = testEndpoint.getAddress();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/afabd789/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
index 33c9cb6..c96f4f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/taskexecutor/TaskExecutorTest.java
@@ -28,17 +28,26 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
 import org.apache.flink.runtime.util.DirectExecutorService;
 import org.apache.flink.util.SerializedValue;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.cglib.proxy.InvocationHandler;
+import org.mockito.cglib.proxy.Proxy;
+import scala.concurrent.Future;
 
 import java.net.URL;
 import java.util.Collections;
 
 import static org.junit.Assert.fail;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 public class TaskExecutorTest extends TestLogger {
 
@@ -48,8 +57,13 @@ public class TaskExecutorTest extends TestLogger {
        @Test
        public void testTaskExecution() throws Exception {
                RpcService testingRpcService = mock(RpcService.class);
+               InvocationHandler invocationHandler = 
mock(InvocationHandler.class);
+               Object selfGateway = 
Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] 
{TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, 
invocationHandler);
+               
when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
+
                DirectExecutorService directExecutorService = new 
DirectExecutorService();
                TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, 
directExecutorService);
+               taskExecutor.start();
 
                TaskDeploymentDescriptor tdd = new TaskDeploymentDescriptor(
                        new JobID(),
@@ -82,8 +96,12 @@ public class TaskExecutorTest extends TestLogger {
        @Test(expected=Exception.class)
        public void testWrongTaskCancellation() throws Exception {
                RpcService testingRpcService = mock(RpcService.class);
+               InvocationHandler invocationHandler = 
mock(InvocationHandler.class);
+               Object selfGateway = 
Proxy.newProxyInstance(ClassLoader.getSystemClassLoader(), new Class<?>[] 
{TaskExecutorGateway.class, MainThreadExecutor.class, StartStoppable.class}, 
invocationHandler);
+               
when(testingRpcService.startServer(Matchers.any(RpcEndpoint.class))).thenReturn((RpcGateway)selfGateway);
                DirectExecutorService directExecutorService = null;
                TaskExecutor taskExecutor = new TaskExecutor(testingRpcService, 
directExecutorService);
+               taskExecutor.start();
 
                taskExecutor.cancelTask(new ExecutionAttemptID());
 

Reply via email to