[FLINK-4703] RpcCompletenessTest: Add support for type arguments and subclasses

This closes #2561


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e58ebf2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e58ebf2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e58ebf2

Branch: refs/heads/flip-6
Commit: 6e58ebf22cb11631438ea51118615053e11cbcdb
Parents: 415af17
Author: Maximilian Michels <m...@apache.org>
Authored: Wed Sep 28 12:39:30 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Oct 14 15:14:42 2016 +0200

----------------------------------------------------------------------
 .../apache/flink/runtime/rpc/RpcEndpoint.java   | 23 +++++-
 .../flink/runtime/rpc/RpcCompletenessTest.java  | 80 ++++++++++++++++++--
 2 files changed, 94 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e58ebf2/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
index 4e5e49a..79961f7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcEndpoint.java
@@ -85,9 +85,9 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
 
                // IMPORTANT: Don't change order of selfGatewayType and self 
because rpcService.startServer
                // requires that selfGatewayType has been initialized
-               this.selfGatewayType = 
ReflectionUtil.getTemplateType1(getClass());
+               this.selfGatewayType = determineSelfGatewayType();
                this.self = rpcService.startServer(this);
-               
+
                this.mainThreadExecutor = new 
MainThreadExecutor((MainThreadExecutable) self);
        }
 
@@ -255,4 +255,23 @@ public abstract class RpcEndpoint<C extends RpcGateway> {
                        gateway.runAsync(runnable);
                }
        }
+
+       /**
+        * Determines the self gateway type specified in one of the subclasses 
which extend this class.
+        * May traverse multiple class hierarchies until a Gateway type is 
found as a first type argument.
+        * @return Class<C> The determined self gateway type
+        */
+       private Class<C> determineSelfGatewayType() {
+
+               // determine self gateway type
+               Class c = getClass();
+               Class<C> determinedSelfGatewayType;
+               do {
+                       determinedSelfGatewayType = 
ReflectionUtil.getTemplateType1(c);
+                       // check if super class contains self gateway type in 
next loop
+                       c = c.getSuperclass();
+               } while 
(!RpcGateway.class.isAssignableFrom(determinedSelfGatewayType));
+
+               return determinedSelfGatewayType;
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e58ebf2/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
index 53355e8..e7143ae 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/RpcCompletenessTest.java
@@ -26,9 +26,14 @@ import org.apache.flink.util.ReflectionUtil;
 import org.apache.flink.util.TestLogger;
 import org.junit.Test;
 import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.lang.annotation.Annotation;
 import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.lang.reflect.Type;
+import java.lang.reflect.TypeVariable;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -41,8 +46,33 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+/**
+ * Test which ensures that all classes of subtype {@link RpcEndpoint} implement
+ * the methods specified in the generic gateway type argument.
+ *
+ * {@code
+ *         RpcEndpoint<GatewayTypeParameter extends RpcGateway>
+ * }
+ *
+ * Note, that the class hierarchy can also be nested. In this case the type 
argument
+ * always has to be the first argument, e.g. {@code
+ *
+ *         // RpcClass needs to implement RpcGatewayClass' methods
+ *         RpcClass extends RpcEndpoint<RpcGatewayClass>
+ *
+ *         // RpcClass2 or its subclass needs to implement RpcGatewayClass' 
methods
+ *      RpcClass<GatewayTypeParameter extends RpcGateway,...> extends 
RpcEndpoint<GatewayTypeParameter>
+ *      RpcClass2 extends RpcClass<RpcGatewayClass,...>
+ *
+ *      // needless to say, this can even be nested further
+ *      ...
+ * }
+ *
+ */
 public class RpcCompletenessTest extends TestLogger {
 
+       private static Logger LOG = 
LoggerFactory.getLogger(RpcCompletenessTest.class);
+
        private static final Class<?> futureClass = Future.class;
        private static final Class<?> timeoutClass = Time.class;
 
@@ -55,16 +85,52 @@ public class RpcCompletenessTest extends TestLogger {
 
                Class<? extends RpcEndpoint> c;
 
-               for (Class<? extends RpcEndpoint> rpcEndpoint :classes){
+               mainloop:
+               for (Class<? extends RpcEndpoint> rpcEndpoint : classes) {
                        c = rpcEndpoint;
 
-                       Class<?> rpcGatewayType = 
ReflectionUtil.getTemplateType1(c);
+                       LOG.debug("-------------");
+                       LOG.debug("c: {}", c);
 
-                       if (rpcGatewayType != null) {
-                               checkCompleteness(rpcEndpoint, (Class<? extends 
RpcGateway>) rpcGatewayType);
-                       } else {
-                               fail("Could not retrieve the rpc gateway class 
for the given rpc endpoint class " + rpcEndpoint.getName());
+                       // skip abstract classes
+                       if (Modifier.isAbstract(c.getModifiers())) {
+                               LOG.debug("Skipping abstract class");
+                               continue;
                        }
+
+                       // check for type parameter bound to RpcGateway
+                       // skip if one is found because a subclass will provide 
the concrete argument
+                       TypeVariable<? extends Class<? extends RpcEndpoint>>[] 
typeParameters = c.getTypeParameters();
+                       LOG.debug("Checking {} parameters.", 
typeParameters.length);
+                       for (int i = 0; i < typeParameters.length; i++) {
+                               for (Type bound : 
typeParameters[i].getBounds()) {
+                                       LOG.debug("checking bound {} of type 
parameter {}", bound, typeParameters[i]);
+                                       if (bound.toString().equals("interface 
" + RpcGateway.class.getName())) {
+                                               if (i > 0) {
+                                                       fail("Type parameter 
for RpcGateway should come first in " + c);
+                                               }
+                                               LOG.debug("Skipping class with 
type parameter bound to RpcGateway.");
+                                               // Type parameter is bound to 
RpcGateway which a subclass will provide
+                                               continue mainloop;
+                                       }
+                               }
+                       }
+
+                       // check if this class or any super class contains the 
RpcGateway argument
+                       Class<?> rpcGatewayType;
+                       do {
+                               LOG.debug("checking type argument of class: 
{}", c);
+                               rpcGatewayType = 
ReflectionUtil.getTemplateType1(c);
+                               LOG.debug("type argument is: {}", 
rpcGatewayType);
+
+                               c = (Class<? extends RpcEndpoint>) 
c.getSuperclass();
+
+                       } while 
(!RpcGateway.class.isAssignableFrom(rpcGatewayType));
+
+                       LOG.debug("Checking RRC completeness of endpoint '{}' 
with gateway '{}'",
+                               rpcEndpoint.getSimpleName(), 
rpcGatewayType.getSimpleName());
+
+                       checkCompleteness(rpcEndpoint, (Class<? extends 
RpcGateway>) rpcGatewayType);
                }
        }
 
@@ -352,7 +418,7 @@ public class RpcCompletenessTest extends TestLogger {
         */
        private List<Method> getRpcMethodsFromGateway(Class<? extends 
RpcGateway> interfaceClass) {
                if(!interfaceClass.isInterface()) {
-                       fail(interfaceClass.getName() + "is not a interface");
+                       fail(interfaceClass.getName() + " is not a interface");
                }
 
                ArrayList<Method> allMethods = new ArrayList<>();

Reply via email to