Repository: flink Updated Branches: refs/heads/flip-6 fdeda082f -> 3cda59339
[hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails fast when endpoint is unreachable. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/629078ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/629078ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/629078ee Branch: refs/heads/flip-6 Commit: 629078ee3f5fcecd5498a81abaf8c99f9e614b02 Parents: fdeda08 Author: Stephan Ewen <se...@apache.org> Authored: Wed Sep 21 13:03:17 2016 +0200 Committer: Stephan Ewen <se...@apache.org> Committed: Fri Sep 23 19:44:13 2016 +0200 ---------------------------------------------------------------------- .../flink/runtime/rpc/AsyncCallsTest.java | 4 +- .../flink/runtime/rpc/RpcCompletenessTest.java | 14 ++-- .../flink/runtime/rpc/RpcConnectionTest.java | 86 ++++++++++++++++++++ 3 files changed, 96 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java index e8255d4..7affdb9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java @@ -43,9 +43,9 @@ public class AsyncCallsTest extends TestLogger { // shared test members // ------------------------------------------------------------------------ - private static ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); + private static final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem(); - private static AkkaRpcService akkaRpcService = + private static final AkkaRpcService akkaRpcService = new AkkaRpcService(actorSystem, Time.milliseconds(10000L)); @AfterClass http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java index ee3f784..53355e8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java @@ -30,6 +30,7 @@ import org.reflections.Reflections; import java.lang.annotation.Annotation; import java.lang.reflect.Method; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -69,7 +70,8 @@ public class RpcCompletenessTest extends TestLogger { @SuppressWarnings("rawtypes") private void checkCompleteness(Class<? extends RpcEndpoint> rpcEndpoint, Class<? extends RpcGateway> rpcGateway) { - Method[] gatewayMethods = getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]); + List<Method> rpcMethodsFromGateway = getRpcMethodsFromGateway(rpcGateway); + Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new Method[rpcMethodsFromGateway.size()]); Method[] serverMethods = rpcEndpoint.getMethods(); Map<String, Set<Method>> rpcMethods = new HashMap<>(); @@ -360,13 +362,13 @@ public class RpcCompletenessTest extends TestLogger { } // Get all methods declared in current interface - for(Method method : interfaceClass.getDeclaredMethods()) { - allMethods.add(method); - } + Collections.addAll(allMethods, interfaceClass.getDeclaredMethods()); // Get all method inherited from super interface - for(Class superClass : interfaceClass.getInterfaces()) { - allMethods.addAll(getRpcMethodsFromGateway(superClass)); + for (Class<?> superClass : interfaceClass.getInterfaces()) { + @SuppressWarnings("unchecked") + Class<? extends RpcGateway> gatewayClass = (Class<? extends RpcGateway>) superClass; + allMethods.addAll(getRpcMethodsFromGateway(gatewayClass)); } return allMethods; } http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java new file mode 100644 index 0000000..6363662 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rpc; + +import akka.actor.ActorSystem; +import akka.util.Timeout; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import org.junit.AfterClass; +import org.junit.Test; + +import scala.Option; +import scala.Tuple2; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.FiniteDuration; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.Assert.*; + +/** + * This test validates that the RPC service gives a good message when it cannot + * connect to an RpcEndpoint. + */ +public class RpcConnectionTest { + + @Test + public void testConnectFailure() { + ActorSystem actorSystem = null; + RpcService rpcService = null; + try { + actorSystem = AkkaUtils.createActorSystem( + new Configuration(), Option.apply(new Tuple2<String, Object>("localhost", 0))); + + // we start the RPC service with a very long timeout to ensure that the test + // can only pass if the connection problem is not recognized merely via a timeout + rpcService = new AkkaRpcService(actorSystem, new Timeout(10000000, TimeUnit.SECONDS)); + + Future<TaskExecutorGateway> future = rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class); + + Await.result(future, new FiniteDuration(10000000, TimeUnit.SECONDS)); + fail("should never complete normally"); + } + catch (TimeoutException e) { + fail("should not fail with a generic timeout exception"); + } + catch (RpcConnectionException e) { + // that is what we want + assertTrue("wrong error message", e.getMessage().contains("foo.bar.com.test.invalid")); + } + catch (Throwable t) { + fail("wrong exception: " + t); + } + finally { + if (rpcService != null) { + rpcService.stopService(); + } + if (actorSystem != null) { + actorSystem.shutdown(); + } + } + } +}