Repository: flink
Updated Branches:
  refs/heads/flip-6 fdeda082f -> 3cda59339


[hotfix] [rpc] Add RpcConnectionTest to validate that connection buildup fails 
fast when endpoint is unreachable.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/629078ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/629078ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/629078ee

Branch: refs/heads/flip-6
Commit: 629078ee3f5fcecd5498a81abaf8c99f9e614b02
Parents: fdeda08
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Sep 21 13:03:17 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Sep 23 19:44:13 2016 +0200

----------------------------------------------------------------------
 .../flink/runtime/rpc/AsyncCallsTest.java       |  4 +-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 14 ++--
 .../flink/runtime/rpc/RpcConnectionTest.java    | 86 ++++++++++++++++++++
 3 files changed, 96 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
index e8255d4..7affdb9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/AsyncCallsTest.java
@@ -43,9 +43,9 @@ public class AsyncCallsTest extends TestLogger {
        //  shared test members
        // 
------------------------------------------------------------------------
 
-       private static ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
+       private static final ActorSystem actorSystem = 
AkkaUtils.createDefaultActorSystem();
 
-       private static AkkaRpcService akkaRpcService =
+       private static final AkkaRpcService akkaRpcService =
                        new AkkaRpcService(actorSystem, 
Time.milliseconds(10000L));
 
        @AfterClass

http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index ee3f784..53355e8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -30,6 +30,7 @@ import org.reflections.Reflections;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -69,7 +70,8 @@ public class RpcCompletenessTest extends TestLogger {
 
        @SuppressWarnings("rawtypes")
        private void checkCompleteness(Class<? extends RpcEndpoint> 
rpcEndpoint, Class<? extends RpcGateway> rpcGateway) {
-               Method[] gatewayMethods = 
getRpcMethodsFromGateway(rpcGateway).toArray(new Method[0]);
+               List<Method> rpcMethodsFromGateway = 
getRpcMethodsFromGateway(rpcGateway);
+               Method[] gatewayMethods = rpcMethodsFromGateway.toArray(new 
Method[rpcMethodsFromGateway.size()]);
                Method[] serverMethods = rpcEndpoint.getMethods();
 
                Map<String, Set<Method>> rpcMethods = new HashMap<>();
@@ -360,13 +362,13 @@ public class RpcCompletenessTest extends TestLogger {
                }
 
                // Get all methods declared in current interface
-               for(Method method : interfaceClass.getDeclaredMethods()) {
-                       allMethods.add(method);
-               }
+               Collections.addAll(allMethods, 
interfaceClass.getDeclaredMethods());
 
                // Get all method inherited from super interface
-               for(Class superClass : interfaceClass.getInterfaces()) {
-                       allMethods.addAll(getRpcMethodsFromGateway(superClass));
+               for (Class<?> superClass : interfaceClass.getInterfaces()) {
+                       @SuppressWarnings("unchecked")
+                       Class<? extends RpcGateway> gatewayClass = (Class<? 
extends RpcGateway>) superClass;
+                       
allMethods.addAll(getRpcMethodsFromGateway(gatewayClass));
                }
                return allMethods;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/629078ee/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
new file mode 100644
index 0000000..6363662
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcConnectionTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.actor.ActorSystem;
+import akka.util.Timeout;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
+import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import org.junit.AfterClass;
+import org.junit.Test;
+
+import scala.Option;
+import scala.Tuple2;
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.*;
+
+/**
+ * This test validates that the RPC service gives a good message when it cannot
+ * connect to an RpcEndpoint.
+ */
+public class RpcConnectionTest {
+
+       @Test
+       public void testConnectFailure() {
+               ActorSystem actorSystem = null;
+               RpcService rpcService = null;
+               try {
+                       actorSystem = AkkaUtils.createActorSystem(
+                                       new Configuration(), Option.apply(new 
Tuple2<String, Object>("localhost", 0)));
+
+                       // we start the RPC service with a very long timeout to 
ensure that the test
+                       // can only pass if the connection problem is not 
recognized merely via a timeout
+                       rpcService = new AkkaRpcService(actorSystem, new 
Timeout(10000000, TimeUnit.SECONDS));
+
+                       Future<TaskExecutorGateway> future = 
rpcService.connect("foo.bar.com.test.invalid", TaskExecutorGateway.class);
+
+                       Await.result(future, new FiniteDuration(10000000, 
TimeUnit.SECONDS));
+                       fail("should never complete normally");
+               }
+               catch (TimeoutException e) {
+                       fail("should not fail with a generic timeout 
exception");
+               }
+               catch (RpcConnectionException e) {
+                       // that is what we want
+                       assertTrue("wrong error message", 
e.getMessage().contains("foo.bar.com.test.invalid"));
+               }
+               catch (Throwable t) {
+                       fail("wrong exception: " + t);
+               }
+               finally {
+                       if (rpcService != null) {
+                               rpcService.stopService();
+                       }
+                       if (actorSystem != null) {
+                               actorSystem.shutdown();
+                       }
+               }
+       }
+}

Reply via email to