[FLINK-4658] [rpc] Allow RpcService to execute Runnables and Callables in its executor
This closes #2531. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f4dc4741 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f4dc4741 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f4dc4741 Branch: refs/heads/flip-6 Commit: f4dc47411fc06b5f3ee0ff34c2ebf02bd459ffa8 Parents: 507e86c Author: Till Rohrmann <trohrm...@apache.org> Authored: Wed Sep 21 18:16:27 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Oct 14 15:14:40 2016 +0200 ---------------------------------------------------------------------- .../apache/flink/runtime/rpc/RpcService.java | 31 ++++++++++++++ .../flink/runtime/rpc/akka/AkkaRpcService.java | 14 ++++++ .../flink/runtime/rpc/AsyncCallsTest.java | 1 - .../runtime/rpc/TestingSerialRpcService.java | 16 +++++++ .../runtime/rpc/akka/AkkaRpcServiceTest.java | 45 ++++++++++++++++++++ 5 files changed, 106 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java index a367ff2..437e08b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.rpc; import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -89,4 +90,34 @@ public interface RpcService { * @param delay The delay after which the runnable will be executed */ void scheduleRunnable(Runnable runnable, long delay, TimeUnit unit); + + /** + * Execute the given runnable in the executor of the RPC service. This method can be used to run + * code outside of the main thread of a {@link RpcEndpoint}. + * + * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against + * any concurrent invocations and is therefore not suitable to run completion methods of futures + * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that + * {@code RpcEndpoint}. + * + * @param runnable to execute + */ + void execute(Runnable runnable); + + /** + * Execute the given callable and return its result as a {@link Future}. This method can be used + * to run code outside of the main thread of a {@link RpcEndpoint}. + * + * <p><b>IMPORTANT:</b> This executor does not isolate the method invocations against + * any concurrent invocations and is therefore not suitable to run completion methods of futures + * that modify state of an {@link RpcEndpoint}. For such operations, one needs to use the + * {@link RpcEndpoint#getMainThreadExecutor() MainThreadExecutionContext} of that + * {@code RpcEndpoint}. + * + * @param callable to execute + * @param <T> is the return value type + * @return Future containing the callable's future result + */ + <T> Future<T> execute(Callable<T> callable); } http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java index 36f1115..cee19c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java @@ -25,6 +25,7 @@ import akka.actor.ActorSystem; import akka.actor.Identify; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.dispatch.Futures; import akka.dispatch.Mapper; import akka.pattern.Patterns; @@ -48,6 +49,7 @@ import java.lang.reflect.InvocationHandler; import java.lang.reflect.Proxy; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.Callable; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; @@ -223,4 +225,16 @@ public class AkkaRpcService implements RpcService { actorSystem.scheduler().scheduleOnce(new FiniteDuration(delay, unit), runnable, actorSystem.dispatcher()); } + + @Override + public void execute(Runnable runnable) { + actorSystem.dispatcher().execute(runnable); + } + + @Override + public <T> Future<T> execute(Callable<T> callable) { + scala.concurrent.Future<T> scalaFuture = Futures.future(callable, actorSystem.dispatcher()); + + return new FlinkFuture<>(scalaFuture); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index 7c6b0ee..e8255d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -19,7 +19,6 @@ package org.apache.flink.runtime.rpc; import akka.actor.ActorSystem; -import akka.util.Timeout; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java index 957453a..c58ea20 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java @@ -65,6 +65,22 @@ public class TestingSerialRpcService implements RpcService { } @Override + public void execute(Runnable runnable) { + runnable.run(); + } + + @Override + public <T> Future<T> execute(Callable<T> callable) { + try { + T result = callable.call(); + + return FlinkCompletableFuture.completed(result); + } catch (Exception e) { + return FlinkCompletableFuture.completedExceptionally(e); + } + } + + @Override public Executor getExecutor() { return executorService; } http://git-wip-us.apache.org/repos/asf/flink/blob/f4dc4741/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java index 4e9e518..5550cb5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/akka/AkkaRpcServiceTest.java @@ -22,13 +22,18 @@ import akka.actor.ActorSystem; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.Test; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; public class AkkaRpcServiceTest extends TestLogger { @@ -70,4 +75,44 @@ public class AkkaRpcServiceTest extends TestLogger { assertTrue("call was not properly delayed", ((stop - start) / 1000000) >= delay); } + + /** + * Tests that the {@link AkkaRpcService} can execute runnables + */ + @Test + public void testExecuteRunnable() throws Exception { + final OneShotLatch latch = new OneShotLatch(); + + akkaRpcService.execute(new Runnable() { + @Override + public void run() { + latch.trigger(); + } + }); + + latch.await(30L, TimeUnit.SECONDS); + } + + /** + * Tests that the {@link AkkaRpcService} can execute callables and returns their result as + * a {@link Future}. + */ + @Test + public void testExecuteCallable() throws InterruptedException, ExecutionException, TimeoutException { + final OneShotLatch latch = new OneShotLatch(); + final int expected = 42; + + Future<Integer> result = akkaRpcService.execute(new Callable<Integer>() { + @Override + public Integer call() throws Exception { + latch.trigger(); + return expected; + } + }); + + int actual = result.get(30L, TimeUnit.SECONDS); + + assertEquals(expected, actual); + assertTrue(latch.isTriggered()); + } }