[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its 
executor

This closes #2531.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4dc4741
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4dc4741
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4dc4741

Branch: refs/heads/flip-6
Commit: f4dc47411fc06b5f3ee0ff34c2ebf02bd459ffa8
Parents: 507e86c
Author: Till Rohrmann <trohrm...@apache.org>
Authored: Wed Sep 21 18:16:27 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:40 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcService.java    | 31 ++++++++++++++
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 14 ++++++
 .../flink/runtime/rpc/AsyncCallsTest.java       |  1 -
 .../runtime/rpc/TestingSerialRpcService.java    | 16 +++++++
 .../runtime/rpc/akka/AkkaRpcServiceTest.java    | 45 ++++++++++++++++++++
 5 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index a367ff2..437e08b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -89,4 +90,34 @@ public interface RpcService {
         * @param delay    The delay after which the runnable will be executed
         */
        void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit);
+
+       /**
+        * Execute the given runnable in the executor of the RPC service. This 
method can be used to run
+        * code outside of the main thread of a {@link RpcEndpoint}.
+        *
+        * <p><b>IMPORTANT:</b> This executor does not isolate the method 
invocations against
+        * any concurrent invocations and is therefore not suitable to run 
completion methods of futures
+        * that modify state of an {@link RpcEndpoint}. For such operations, 
one needs to use the
+        * {@link RpcEndpoint#getMainThreadExecutor() 
MainThreadExecutionContext} of that
+        * {@code RpcEndpoint}.
+        *
+        * @param runnable to execute
+        */
+       void execute(Runnable runnable);
+
+       /**
+        * Execute the given callable and return its result as a {@link 
Future}. This method can be used
+        * to run code outside of the main thread of a {@link RpcEndpoint}.
+        *
+        * <p><b>IMPORTANT:</b> This executor does not isolate the method 
invocations against
+        * any concurrent invocations and is therefore not suitable to run 
completion methods of futures
+        * that modify state of an {@link RpcEndpoint}. For such operations, 
one needs to use the
+        * {@link RpcEndpoint#getMainThreadExecutor() 
MainThreadExecutionContext} of that
+        * {@code RpcEndpoint}.
+        *
+        * @param callable to execute
+        * @param <T> is the return value type
+        * @return Future containing the callable's future result
+        */
+       <T> Future<T> execute(Callable<T> callable);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index 36f1115..cee19c4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -25,6 +25,7 @@ import akka.actor.ActorSystem;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
+import akka.dispatch.Futures;
 import akka.dispatch.Mapper;
 
 import akka.pattern.Patterns;
@@ -48,6 +49,7 @@ import java.lang.reflect.InvocationHandler;
 import java.lang.reflect.Proxy;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
 
@@ -223,4 +225,16 @@ public class AkkaRpcService implements RpcService {
 
                actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, 
unit), runnable, actorSystem.dispatcher());
        }
+
+       @Override
+       public void execute(Runnable runnable) {
+               actorSystem.dispatcher().execute(runnable);
+       }
+
+       @Override
+       public <T> Future<T> execute(Callable<T> callable) {
+               scala.concurrent.Future<T> scalaFuture = 
Futures.future(callable, actorSystem.dispatcher());
+
+               return new FlinkFuture<>(scalaFuture);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index 7c6b0ee..e8255d4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.rpc;
 
 import akka.actor.ActorSystem;
-import akka.util.Timeout;
 
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;

http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
index 957453a..c58ea20 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -65,6 +65,22 @@ public class TestingSerialRpcService implements RpcService {
        }
 
        @Override
+       public void execute(Runnable runnable) {
+               runnable.run();
+       }
+
+       @Override
+       public <T> Future<T> execute(Callable<T> callable) {
+               try {
+                       T result = callable.call();
+
+                       return FlinkCompletableFuture.completed(result);
+               } catch (Exception e) {
+                       return FlinkCompletableFuture.completedExceptionally(e);
+               }
+       }
+
+       @Override
        public Executor getExecutor() {
                return executorService;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
index 4e9e518..5550cb5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java
@@ -22,13 +22,18 @@ import akka.actor.ActorSystem;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
 import org.junit.Test;
 
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 public class AkkaRpcServiceTest extends TestLogger {
@@ -70,4 +75,44 @@ public class AkkaRpcServiceTest extends TestLogger {
 
                assertTrue("call was not properly delayed", ((stop - start) / 
1000000) >= delay);
        }
+
+       /**
+        * Tests that the {@link AkkaRpcService} can execute runnables
+        */
+       @Test
+       public void testExecuteRunnable() throws Exception {
+               final OneShotLatch latch = new OneShotLatch();
+
+               akkaRpcService.execute(new Runnable() {
+                       @Override
+                       public void run() {
+                               latch.trigger();
+                       }
+               });
+
+               latch.await(30L, TimeUnit.SECONDS);
+       }
+
+       /**
+        * Tests that the {@link AkkaRpcService} can execute callables and 
returns their result as
+        * a {@link Future}.
+        */
+       @Test
+       public void testExecuteCallable() throws InterruptedException, 
ExecutionException, TimeoutException {
+               final OneShotLatch latch = new OneShotLatch();
+               final int expected = 42;
+
+               Future<Integer> result = akkaRpcService.execute(new 
Callable<Integer>() {
+                       @Override
+                       public Integer call() throws Exception {
+                               latch.trigger();
+                               return expected;
+                       }
+               });
+
+               int actual = result.get(30L, TimeUnit.SECONDS);
+
+               assertEquals(expected, actual);
+               assertTrue(latch.isTriggered());
+       }
 }

Reply via email to