[ 
https://issues.apache.org/jira/browse/FLINK-4516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15449277#comment-15449277
 ] 

ASF GitHub Bot commented on FLINK-4516:
---------------------------------------

Github user mxm commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2427#discussion_r76815420
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingSerialRpcService.java
 ---
    @@ -0,0 +1,336 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rpc;
    +
    +import akka.dispatch.ExecutionContexts;
    +import akka.dispatch.Futures;
    +import akka.util.Timeout;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.runtime.util.DirectExecutorService;
    +import org.apache.flink.util.Preconditions;
    +import scala.concurrent.Await;
    +import scala.concurrent.ExecutionContext;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.lang.annotation.Annotation;
    +import java.lang.reflect.InvocationHandler;
    +import java.lang.reflect.Method;
    +import java.lang.reflect.Proxy;
    +import java.util.BitSet;
    +import java.util.UUID;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.TimeUnit;
    +
    +
    +/**
    + * An RPC Service implementation for testing. This RPC service directly 
executes all asynchronous calls one by one in the main thread.
    + */
    +public class TestingSerialRpcService extends TestingRpcService {
    +
    +   private final DirectExecutorService executorService;
    +   private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
    +
    +   public TestingSerialRpcService() {
    +           executorService = new DirectExecutorService();
    +           this.registeredConnections = new ConcurrentHashMap<>();
    +   }
    +
    +   @Override
    +   public void scheduleRunnable(final Runnable runnable, final long delay, 
final TimeUnit unit) {
    +           try {
    +                   unit.sleep(delay);
    +                   runnable.run();
    +           } catch (Throwable e) {
    +                   throw new RuntimeException(e);
    +           }
    +   }
    +
    +   @Override
    +   public ExecutionContext getExecutionContext() {
    +           return ExecutionContexts.fromExecutorService(executorService);
    +   }
    +
    +   @Override
    +   public void stopService() {
    +           executorService.shutdown();
    +           registeredConnections.clear();
    +   }
    +
    +   @Override
    +   public void stopServer(RpcGateway selfGateway) {
    +
    +   }
    +
    +   @Override
    +   public <C extends RpcGateway, S extends RpcEndpoint<C>> C startServer(S 
rpcEndpoint) {
    +           final String address = UUID.randomUUID().toString();
    +
    +           InvocationHandler akkaInvocationHandler = new 
TestingSerialInvocationHandler(address, rpcEndpoint);
    +           ClassLoader classLoader = getClass().getClassLoader();
    +
    +           @SuppressWarnings("unchecked")
    +           C self = (C) Proxy.newProxyInstance(
    +                   classLoader,
    +                   new Class<?>[]{
    +                           rpcEndpoint.getSelfGatewayType(),
    +                           MainThreadExecutor.class,
    +                           StartStoppable.class,
    +                           RpcGateway.class},
    +                   akkaInvocationHandler);
    +
    +           return self;
    +   }
    +
    +   private static class TestingSerialInvocationHandler<C extends 
RpcGateway, T extends RpcEndpoint<C>> implements InvocationHandler, RpcGateway, 
MainThreadExecutor, StartStoppable {
    +
    +           private final T rpcEndpoint;
    +
    +           /** default timeout for asks */
    +           private final Timeout timeout;
    +
    +           private final String address;
    +
    +           private TestingSerialInvocationHandler(String address, T 
rpcEndpoint) {
    +                   this(address, rpcEndpoint, new Timeout(new 
FiniteDuration(10, TimeUnit.SECONDS)));
    +           }
    +
    +           private TestingSerialInvocationHandler(String address, T 
rpcEndpoint, Timeout timeout) {
    +                   this.rpcEndpoint = rpcEndpoint;
    +                   this.timeout = timeout;
    +                   this.address = address;
    +           }
    +
    +           @Override
    +           public Object invoke(Object proxy, Method method, Object[] 
args) throws Throwable {
    +                   Class<?> declaringClass = method.getDeclaringClass();
    +                   if (declaringClass.equals(MainThreadExecutor.class) ||
    +                           declaringClass.equals(Object.class) || 
declaringClass.equals(StartStoppable.class) ||
    +                           declaringClass.equals(RpcGateway.class)) {
    +                           return method.invoke(this, args);
    +                   } else {
    +                           final String methodName = method.getName();
    +                           Class<?>[] parameterTypes = 
method.getParameterTypes();
    +                           Annotation[][] parameterAnnotations = 
method.getParameterAnnotations();
    +                           Timeout futureTimeout = 
extractRpcTimeout(parameterAnnotations, args, timeout);
    +
    +                           final Tuple2<Class<?>[], Object[]> 
filteredArguments = filterArguments(
    +                                   parameterTypes,
    +                                   parameterAnnotations,
    +                                   args);
    +
    +                           Class<?> returnType = method.getReturnType();
    +
    +                           if (returnType.equals(Future.class)) {
    +                                   try {
    +                                           Object result = 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
    +                                           return 
Futures.successful(result);
    +                                   } catch (Throwable e) {
    +                                           return Futures.failed(e);
    +                                   }
    +                           } else {
    +                                   return 
handleRpcInvocationSync(methodName, filteredArguments.f0, filteredArguments.f1, 
futureTimeout);
    +                           }
    +                   }
    +           }
    +
    +           /**
    +            * Handle rpc invocations by looking up the rpc method on the 
rpc endpoint and calling this
    +            * method with the provided method arguments. If the method has 
a return value, it is returned
    +            * to the sender of the call.
    +            */
    +           private Object handleRpcInvocationSync(final String methodName,
    +                   final Class<?>[] parameterTypes,
    +                   final Object[] args,
    +                   final Timeout futureTimeout) throws Exception {
    +                   final Method rpcMethod = lookupRpcMethod(methodName, 
parameterTypes);
    +                   Object result = rpcMethod.invoke(rpcEndpoint, args);
    +
    +                   if (result != null && result instanceof Future) {
    +                           Future<?> future = (Future<?>) result;
    +                           return Await.result(future, 
futureTimeout.duration());
    +                   } else {
    +                           return result;
    +                   }
    +           }
    +
    +           @Override
    +           public void runAsync(Runnable runnable) {
    +                   runnable.run();
    +           }
    +
    +           @Override
    +           public <V> Future<V> callAsync(Callable<V> callable, Timeout 
callTimeout) {
    +                   try {
    +                           
TimeUnit.MILLISECONDS.sleep(callTimeout.duration().toMillis());
    +                           return Futures.successful(callable.call());
    +                   } catch (Throwable e) {
    +                           return Futures.failed(e);
    +                   }
    +           }
    +
    +           @Override
    +           public void scheduleRunAsync(final Runnable runnable, final 
long delay) {
    +                   try {
    +                           TimeUnit.MILLISECONDS.sleep(delay);
    +                           runnable.run();
    +                   } catch (Throwable e) {
    +                           throw new RuntimeException(e);
    +                   }
    +           }
    +
    +           @Override
    +           public void start() {
    --- End diff --
    
    No need to implement `StartStoppable` if you don't use the methods.


> ResourceManager leadership election
> -----------------------------------
>
>                 Key: FLINK-4516
>                 URL: https://issues.apache.org/jira/browse/FLINK-4516
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Cluster Management
>            Reporter: zhangjing
>            Assignee: zhangjing
>
> 1. When a resourceManager is started, it starts the leadership election 
> service first and take part in contending for leadership
> 2. Every resourceManager contains a ResourceManagerLeaderContender, when it 
> is granted leadership, it will start SlotManager and other main components. 
> when it is revoked leadership, it will stop all its components and clear 
> everything.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to