[FLINK-4516] leader election of resourcemanager

- add serial rpc service
- add a special rpcService implementation which directly executes the 
asynchronous calls serially one by one, it is just for testcase
- Change ResourceManagerLeaderContender code and TestingSerialRpcService code
- override shutdown logic to stop leadershipService
- use a mocked RpcService rather than TestingSerialRpcService for 
resourceManager HA test

This closes #2427


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/454bf51b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/454bf51b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/454bf51b

Branch: refs/heads/flip-6
Commit: 454bf51b063e30075f08d3e400e4db2bf416b969
Parents: c4abfd9
Author: beyond1920 <beyond1...@126.com>
Authored: Sat Aug 27 14:14:28 2016 +0800
Committer: Maximilian Michels <m...@apache.org>
Committed: Wed Aug 31 17:17:47 2016 +0200

----------------------------------------------------------------------
 .../HighAvailabilityServices.java               |   7 +
 .../runtime/highavailability/NonHaServices.java |   5 +
 .../rpc/resourcemanager/ResourceManager.java    | 111 +++++-
 .../TestingHighAvailabilityServices.java        |  19 +-
 .../runtime/rpc/TestingSerialRpcService.java    | 369 +++++++++++++++++++
 .../resourcemanager/ResourceManagerHATest.java  |  76 ++++
 6 files changed, 578 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
index 73e4f1f..298147c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/HighAvailabilityServices.java
@@ -40,6 +40,13 @@ public interface HighAvailabilityServices {
        LeaderRetrievalService getResourceManagerLeaderRetriever() throws 
Exception;
 
        /**
+        * Gets the leader election service for the cluster's resource manager.
+        * @return
+        * @throws Exception
+        */
+       LeaderElectionService getResourceManagerLeaderElectionService() throws 
Exception;
+
+       /**
         * Gets the leader election service for the given job.
         *
         * @param jobID The identifier of the job running the election.

http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
index 3d2769b..292a404 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/NonHaServices.java
@@ -61,6 +61,11 @@ public class NonHaServices implements 
HighAvailabilityServices {
        }
 
        @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+               return new StandaloneLeaderElectionService();
+       }
+
+       @Override
        public LeaderElectionService getJobMasterLeaderElectionService(JobID 
jobID) throws Exception {
                return new StandaloneLeaderElectionService();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
index 6f34465..f7147c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManager.java
@@ -20,24 +20,26 @@ package org.apache.flink.runtime.rpc.resourcemanager;
 
 import akka.dispatch.Mapper;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
+import org.apache.flink.runtime.leaderelection.LeaderContender;
+import org.apache.flink.runtime.leaderelection.LeaderElectionService;
 import org.apache.flink.runtime.rpc.RpcMethod;
 import org.apache.flink.runtime.rpc.RpcEndpoint;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.jobmaster.JobMaster;
 import org.apache.flink.runtime.rpc.jobmaster.JobMasterGateway;
 import 
org.apache.flink.runtime.rpc.taskexecutor.TaskExecutorRegistrationSuccess;
-import org.apache.flink.util.Preconditions;
 
-import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.Future;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
-import java.util.concurrent.ExecutorService;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
  * ResourceManager implementation. The resource manager is responsible for 
resource de-/allocation
@@ -50,16 +52,51 @@ import java.util.concurrent.ExecutorService;
  * </ul>
  */
 public class ResourceManager extends RpcEndpoint<ResourceManagerGateway> {
-       private final ExecutionContext executionContext;
        private final Map<JobMasterGateway, InstanceID> jobMasterGateways;
+       private final HighAvailabilityServices highAvailabilityServices;
+       private LeaderElectionService leaderElectionService = null;
+       private UUID leaderSessionID = null;
 
-       public ResourceManager(RpcService rpcService, ExecutorService 
executorService) {
+       public ResourceManager(RpcService rpcService, HighAvailabilityServices 
highAvailabilityServices) {
                super(rpcService);
-               this.executionContext = ExecutionContext$.MODULE$.fromExecutor(
-                       Preconditions.checkNotNull(executorService));
+               this.highAvailabilityServices = 
checkNotNull(highAvailabilityServices);
                this.jobMasterGateways = new HashMap<>();
        }
 
+       @Override
+       public void start() {
+               // start a leader
+               try {
+                       super.start();
+                       leaderElectionService = 
highAvailabilityServices.getResourceManagerLeaderElectionService();
+                       leaderElectionService.start(new 
ResourceManagerLeaderContender());
+               } catch (Throwable e) {
+                       log.error("A fatal error happened when starting the 
ResourceManager", e);
+                       throw new RuntimeException("A fatal error happened when 
starting the ResourceManager", e);
+               }
+       }
+
+       @Override
+       public void shutDown() {
+               try {
+                       leaderElectionService.stop();
+                       super.shutDown();
+               } catch(Throwable e) {
+                       log.error("A fatal error happened when shutdown the 
ResourceManager", e);
+                       throw new RuntimeException("A fatal error happened when 
shutdown the ResourceManager", e);
+               }
+       }
+
+       /**
+        * Gets the leader session id of current resourceManager.
+        *
+        * @return return the leaderSessionId of current resourceManager, this 
returns null until the current resourceManager is granted leadership.
+        */
+       @VisibleForTesting
+       UUID getLeaderSessionID() {
+               return leaderSessionID;
+       }
+
        /**
         * Register a {@link JobMaster} at the resource manager.
         *
@@ -116,4 +153,62 @@ public class ResourceManager extends 
RpcEndpoint<ResourceManagerGateway> {
 
                return new TaskExecutorRegistrationSuccess(new InstanceID(), 
5000);
        }
+
+       private class ResourceManagerLeaderContender implements LeaderContender 
{
+
+               /**
+                * Callback method when current resourceManager is granted 
leadership
+                *
+                * @param leaderSessionID unique leadershipID
+                */
+               @Override
+               public void grantLeadership(final UUID leaderSessionID) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
granted leadership with leader session ID {}", getAddress(), leaderSessionID);
+                                       ResourceManager.this.leaderSessionID = 
leaderSessionID;
+                                       // confirming the leader session ID 
might be blocking,
+                                       
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
+                               }
+                       });
+               }
+
+               /**
+                * Callback method when current resourceManager lose leadership.
+                */
+               @Override
+               public void revokeLeadership() {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.info("ResourceManager {} was 
revoked leadership.", getAddress());
+                                       jobMasterGateways.clear();
+                                       leaderSessionID = null;
+                               }
+                       });
+               }
+
+               @Override
+               public String getAddress() {
+                       return ResourceManager.this.getAddress();
+               }
+
+               /**
+                * Handles error occurring in the leader election service
+                *
+                * @param exception Exception being thrown in the leader 
election service
+                */
+               @Override
+               public void handleError(final Exception exception) {
+                       runAsync(new Runnable() {
+                               @Override
+                               public void run() {
+                                       log.error("ResourceManager received an 
error from the LeaderElectionService.", exception);
+                                       // terminate ResourceManager in case of 
an error
+                                       shutDown();
+                               }
+                       });
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
index 4d654a3..3162f40 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/highavailability/TestingHighAvailabilityServices.java
@@ -32,6 +32,8 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
 
        private volatile LeaderElectionService jobMasterLeaderElectionService;
 
+       private volatile LeaderElectionService 
resourceManagerLeaderElectionService;
+
 
        // 
------------------------------------------------------------------------
        //  Setters for mock / testing implementations
@@ -44,7 +46,11 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
        public void setJobMasterLeaderElectionService(LeaderElectionService 
leaderElectionService) {
                this.jobMasterLeaderElectionService = leaderElectionService;
        }
-       
+
+       public void 
setResourceManagerLeaderElectionService(LeaderElectionService 
leaderElectionService) {
+               this.resourceManagerLeaderElectionService = 
leaderElectionService;
+       }
+
        // 
------------------------------------------------------------------------
        //  HA Services Methods
        // 
------------------------------------------------------------------------
@@ -69,4 +75,15 @@ public class TestingHighAvailabilityServices implements 
HighAvailabilityServices
                        throw new 
IllegalStateException("JobMasterLeaderElectionService has not been set");
                }
        }
+
+       @Override
+       public LeaderElectionService getResourceManagerLeaderElectionService() 
throws Exception {
+               LeaderElectionService service = 
resourceManagerLeaderElectionService;
+
+               if (service != null) {
+                       return service;
+               } else {
+                       throw new 
IllegalStateException("ResourceManagerLeaderElectionService has not been set");
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
new file mode 100644
index 0000000..7bdbb99
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc;
+
+import akka.dispatch.ExecutionContexts;
+import akka.dispatch.Futures;
+import akka.util.Timeout;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.util.DirectExecutorService;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.Await;
+import scala.concurrent.ExecutionContext;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.lang.annotation.Annotation;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.util.BitSet;
+import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+
+/**
+ * An RPC Service implementation for testing. This RPC service directly 
executes all asynchronous calls one by one in the main thread.
+ */
+public class TestingSerialRpcService implements RpcService {
+
+       private final DirectExecutorService executorService;
+       private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
+
+       public TestingSerialRpcService() {
+               executorService = new DirectExecutorService();
+               this.registeredConnections = new ConcurrentHashMap<>();
+       }
+
+       @Override
+       public void scheduleRunnable(final Runnable runnable, final long delay, 
final TimeUnit unit) {
+               try {
+                       unit.sleep(delay);
+                       runnable.run();
+               } catch (Throwable e) {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       @Override
+       public ExecutionContext getExecutionContext() {
+               return ExecutionContexts.fromExecutorService(executorService);
+       }
+
+       @Override
+       public void stopService() {
+               executorService.shutdown();
+               registeredConnections.clear();
+       }
+
+       @Override
+       public void stopServer(RpcGateway selfGateway) {
+
+       }
+
+       @Override
+       public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
+               final String address = UUID.randomUUID().toString();
+
+               InvocationHandler akkaInvocationHandler = new 
TestingSerialInvocationHandler(address, rpcEndpoint);
+               ClassLoader classLoader = getClass().getClassLoader();
+
+               @SuppressWarnings("unchecked")
+               C self = (C) Proxy.newProxyInstance(
+                       classLoader,
+                       new Class<?>[]{
+                               rpcEndpoint.getSelfGatewayType(),
+                               MainThreadExecutor.class,
+                               StartStoppable.class,
+                               RpcGateway.class
+                       },
+                       akkaInvocationHandler);
+
+               return self;
+       }
+
+       @Override
+       public <C extends RpcGateway> Future<C> connect(String address, 
Class<C> clazz) {
+               RpcGateway gateway = registeredConnections.get(address);
+
+               if (gateway != null) {
+                       if (clazz.isAssignableFrom(gateway.getClass())) {
+                               @SuppressWarnings("unchecked")
+                               C typedGateway = (C) gateway;
+                               return Futures.successful(typedGateway);
+                       } else {
+                               return Futures.failed(
+                                       new Exception("Gateway registered under 
" + address + " is not of type " + clazz));
+                       }
+               } else {
+                       return Futures.failed(new Exception("No gateway 
registered under that name"));
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       // connections
+       // 
------------------------------------------------------------------------
+
+       public void registerGateway(String address, RpcGateway gateway) {
+               checkNotNull(address);
+               checkNotNull(gateway);
+
+               if (registeredConnections.putIfAbsent(address, gateway) != 
null) {
+                       throw new IllegalStateException("a gateway is already 
registered under " + address);
+               }
+       }
+
+       private static class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutor, StartStoppable {
+
+               private final T rpcEndpoint;
+
+               /** default timeout for asks */
+               private final Timeout timeout;
+
+               private final String address;
+
+               private TestingSerialInvocationHandler(String address, T 
rpcEndpoint) {
+                       this(address, rpcEndpoint, new Timeout(new 
FiniteDuration(10, TimeUnit.SECONDS)));
+               }
+
+               private TestingSerialInvocationHandler(String address, T 
rpcEndpoint, Timeout timeout) {
+                       this.rpcEndpoint = rpcEndpoint;
+                       this.timeout = timeout;
+                       this.address = address;
+               }
+
+               @Override
+               public Object invoke(Object proxy, Method method, Object[] 
args) throws Throwable {
+                       Class<?> declaringClass = method.getDeclaringClass();
+                       if (declaringClass.equals(MainThreadExecutor.class) ||
+                               declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
+                               declaringClass.equals(RpcGateway.class)) {
+                               return method.invoke(this, args);
+                       } else {
+                               final String methodName = method.getName();
+                               Class<?>[] parameterTypes = 
method.getParameterTypes();
+                               Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
+                               Timeout futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
+
+                               final Tuple2<Class<?>[], Object[]> 
filteredArguments = filterArguments(
+                                       parameterTypes,
+                                       parameterAnnotations,
+                                       args);
+
+                               Class<?> returnType = method.getReturnType();
+
+                               if (returnType.equals(Future.class)) {
+                                       try {
+                                               Object result = 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
+                                               return 
Futures.successful(result);
+                                       } catch (Throwable e) {
+                                               return Futures.failed(e);
+                                       }
+                               } else {
+                                       return 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
+                               }
+                       }
+               }
+
+               /**
+                * Handle rpc invocations by looking up the rpc method on the 
rpc endpoint and calling this
+                * method with the provided method arguments. If the method has 
a return value, it is returned
+                * to the sender of the call.
+                */
+               private Object handleRpcInvocationSync(final String methodName,
+                       final Class<?>[] parameterTypes,
+                       final Object[] args,
+                       final Timeout futureTimeout) throws Exception {
+                       final Method rpcMethod = lookupRpcMethod(methodName, 
parameterTypes);
+                       Object result = rpcMethod.invoke(rpcEndpoint, args);
+
+                       if (result != null && result instanceof Future) {
+                               Future<?> future = (Future<?>) result;
+                               return Await.result(future, 
futureTimeout.duration());
+                       } else {
+                               return result;
+                       }
+               }
+
+               @Override
+               public void runAsync(Runnable runnable) {
+                       runnable.run();
+               }
+
+               @Override
+               public <V> Future<V> callAsync(Callable<V> callable, Timeout 
callTimeout) {
+                       try {
+                               return Futures.successful(callable.call());
+                       } catch (Throwable e) {
+                               return Futures.failed(e);
+                       }
+               }
+
+               @Override
+               public void scheduleRunAsync(final Runnable runnable, final 
long delay) {
+                       try {
+                               TimeUnit.MILLISECONDS.sleep(delay);
+                               runnable.run();
+                       } catch (Throwable e) {
+                               throw new RuntimeException(e);
+                       }
+               }
+
+               @Override
+               public String getAddress() {
+                       return address;
+               }
+
+               @Override
+               public void start() {
+                       // do nothing
+               }
+
+               @Override
+               public void stop() {
+                       // do nothing
+               }
+
+               /**
+                * Look up the rpc method on the given {@link RpcEndpoint} 
instance.
+                *
+                * @param methodName     Name of the method
+                * @param parameterTypes Parameter types of the method
+                * @return Method of the rpc endpoint
+                * @throws NoSuchMethodException Thrown if the method with the 
given name and parameter types
+                *                               cannot be found at the rpc 
endpoint
+                */
+               private Method lookupRpcMethod(final String methodName,
+                       final Class<?>[] parameterTypes) throws 
NoSuchMethodException {
+                       return rpcEndpoint.getClass().getMethod(methodName, 
parameterTypes);
+               }
+
+               // 
------------------------------------------------------------------------
+               //  Helper methods
+               // 
------------------------------------------------------------------------
+
+               /**
+                * Extracts the {@link RpcTimeout} annotated rpc timeout value 
from the list of given method
+                * arguments. If no {@link RpcTimeout} annotated parameter 
could be found, then the default
+                * timeout is returned.
+                *
+                * @param parameterAnnotations Parameter annotations
+                * @param args                 Array of arguments
+                * @param defaultTimeout       Default timeout to return if no 
{@link RpcTimeout} annotated parameter
+                *                             has been found
+                * @return Timeout extracted from the array of arguments or the 
default timeout
+                */
+               private static Timeout extractRpcTimeout(Annotation[][] 
parameterAnnotations, Object[] args,
+                       Timeout defaultTimeout) {
+                       if (args != null) {
+                               
Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+                               for (int i = 0; i < 
parameterAnnotations.length; i++) {
+                                       if 
(isRpcTimeout(parameterAnnotations[i])) {
+                                               if (args[i] instanceof 
FiniteDuration) {
+                                                       return new 
Timeout((FiniteDuration) args[i]);
+                                               } else {
+                                                       throw new 
RuntimeException("The rpc timeout parameter must be of type " +
+                                                               
FiniteDuration.class.getName() + ". The type " + args[i].getClass().getName() +
+                                                               " is not 
supported.");
+                                               }
+                                       }
+                               }
+                       }
+
+                       return defaultTimeout;
+               }
+
+               /**
+                * Removes all {@link RpcTimeout} annotated parameters from the 
parameter type and argument
+                * list.
+                *
+                * @param parameterTypes       Array of parameter types
+                * @param parameterAnnotations Array of parameter annotations
+                * @param args                 Arary of arguments
+                * @return Tuple of filtered parameter types and arguments 
which no longer contain the
+                * {@link RpcTimeout} annotated parameter types and arguments
+                */
+               private static Tuple2<Class<?>[], Object[]> filterArguments(
+                       Class<?>[] parameterTypes,
+                       Annotation[][] parameterAnnotations,
+                       Object[] args) {
+
+                       Class<?>[] filteredParameterTypes;
+                       Object[] filteredArgs;
+
+                       if (args == null) {
+                               filteredParameterTypes = parameterTypes;
+                               filteredArgs = null;
+                       } else {
+                               
Preconditions.checkArgument(parameterTypes.length == 
parameterAnnotations.length);
+                               
Preconditions.checkArgument(parameterAnnotations.length == args.length);
+
+                               BitSet isRpcTimeoutParameter = new 
BitSet(parameterTypes.length);
+                               int numberRpcParameters = parameterTypes.length;
+
+                               for (int i = 0; i < parameterTypes.length; i++) 
{
+                                       if 
(isRpcTimeout(parameterAnnotations[i])) {
+                                               isRpcTimeoutParameter.set(i);
+                                               numberRpcParameters--;
+                                       }
+                               }
+
+                               if (numberRpcParameters == 
parameterTypes.length) {
+                                       filteredParameterTypes = parameterTypes;
+                                       filteredArgs = args;
+                               } else {
+                                       filteredParameterTypes = new 
Class<?>[numberRpcParameters];
+                                       filteredArgs = new 
Object[numberRpcParameters];
+                                       int counter = 0;
+
+                                       for (int i = 0; i < 
parameterTypes.length; i++) {
+                                               if 
(!isRpcTimeoutParameter.get(i)) {
+                                                       
filteredParameterTypes[counter] = parameterTypes[i];
+                                                       filteredArgs[counter] = 
args[i];
+                                                       counter++;
+                                               }
+                                       }
+                               }
+                       }
+                       return Tuple2.of(filteredParameterTypes, filteredArgs);
+               }
+
+               /**
+                * Checks whether any of the annotations is of type {@link 
RpcTimeout}
+                *
+                * @param annotations Array of annotations
+                * @return True if {@link RpcTimeout} was found; otherwise false
+                */
+               private static boolean isRpcTimeout(Annotation[] annotations) {
+                       for (Annotation annotation : annotations) {
+                               if 
(annotation.annotationType().equals(RpcTimeout.class)) {
+                                       return true;
+                               }
+                       }
+
+                       return false;
+               }
+
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/454bf51b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
new file mode 100644
index 0000000..dfffeda
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/resourcemanager/ResourceManagerHATest.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rpc.resourcemanager;
+
+import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
+import org.apache.flink.runtime.rpc.MainThreadExecutor;
+import org.apache.flink.runtime.rpc.RpcEndpoint;
+import org.apache.flink.runtime.rpc.RpcGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.rpc.StartStoppable;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.UUID;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * resourceManager HA test, including grant leadership and revoke leadership
+ */
+public class ResourceManagerHATest {
+
+       @Test
+       public void testGrantAndRevokeLeadership() throws Exception {
+               // mock a RpcService which will return a special RpcGateway 
when call its startServer method, the returned RpcGateway directly execute 
runAsync call
+               TestingResourceManagerGatewayProxy gateway = 
mock(TestingResourceManagerGatewayProxy.class);
+               doCallRealMethod().when(gateway).runAsync(any(Runnable.class));
+
+               RpcService rpcService = mock(RpcService.class);
+               
when(rpcService.startServer(any(RpcEndpoint.class))).thenReturn(gateway);
+
+               TestingLeaderElectionService leaderElectionService = new 
TestingLeaderElectionService();
+               TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
+               
highAvailabilityServices.setResourceManagerLeaderElectionService(leaderElectionService);
+
+               final ResourceManager resourceManager = new 
ResourceManager(rpcService, highAvailabilityServices);
+               resourceManager.start();
+               // before grant leadership, resourceManager's leaderId is null
+               Assert.assertNull(resourceManager.getLeaderSessionID());
+               final UUID leaderId = UUID.randomUUID();
+               leaderElectionService.isLeader(leaderId);
+               // after grant leadership, resourceManager's leaderId has value
+               Assert.assertEquals(leaderId, 
resourceManager.getLeaderSessionID());
+               // then revoke leadership, resourceManager's leaderId is null 
again
+               leaderElectionService.notLeader();
+               Assert.assertNull(resourceManager.getLeaderSessionID());
+       }
+
+       private static abstract class TestingResourceManagerGatewayProxy 
implements MainThreadExecutor, StartStoppable, RpcGateway {
+               @Override
+               public void runAsync(Runnable runnable) {
+                       runnable.run();
+               }
+       }
+
+}

Reply via email to