Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r56199223
  
    --- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 ---
    @@ -0,0 +1,801 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSelection;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +import akka.dispatch.OnComplete;
    +import akka.pattern.Patterns;
    +import akka.util.Timeout;
    +
    +import com.google.common.base.Preconditions;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.akka.FlinkUntypedActor;
    +import 
org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers;
    +import 
org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred;
    +import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
    +import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful;
    +import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
    +import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed;
    +import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
    +import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful;
    +import 
org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable;
    +import 
org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener;
    +import 
org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
    +import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
    +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
    +import 
org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import 
org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
    +import 
org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
    +
    +import org.apache.flink.runtime.messages.RegistrationMessages;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static java.util.Objects.requireNonNull;
    +
    +/**
    + *
    + * <h1>Worker allocation steps</h1>
    + *
    + * <ol>
    + *     <li>The resource manager decides to request more workers. This can 
happen in order
    + *         to fill the initial pool, or as a result of the JobManager 
requesting more workers.</li>
    + *
    + *     <li>The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
    + *         for more containers. After that, the {@link 
#getNumWorkerRequestsPending()}
    + *         should reflect the pending requests.</li>
    + *
    + *     <li>The concrete framework may acquire containers and then trigger 
to start TaskManagers
    + *         in those containers. That should be reflected in {@link 
#getNumWorkersPendingRegistration()}.</li>
    + *
    + *     <li>At some point, the TaskManager processes will have started and 
send a registration
    + *         message to the JobManager. The JobManager will perform
    + *         a lookup with the ResourceManager to check if it really started 
this TaskManager.
    + *         The method {@link #workerRegistered(ResourceID)} will be called
    + *         to inform about a registered worker.</li>
    + * </ol>
    + *
    + */
    +public abstract class FlinkResourceManager<WorkerType extends ResourceID> 
extends FlinkUntypedActor {
    +
    +   /** The exit code with which the process is stopped in case of a fatal 
error */
    +   protected static final int EXIT_CODE_FATAL_ERROR = -13;
    +
    +   /** The default name of the resource manager actor */
    +   public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /** The logger, named for the actual implementing class */
    +   protected final Logger log = LoggerFactory.getLogger(getClass());
    +
    +   /** The Flink configuration object */
    +   protected final Configuration config;
    +
    +   /** The timeout for actor messages sent to the JobManager / 
TaskManagers */
    +   private final FiniteDuration messageTimeout;
    +
    +   /** The service to find the right leader JobManager (to support high 
availability) */
    +   private final LeaderRetrievalService leaderRetriever;
    +
    +   /** The currently registered resources */
    +   private final Map<ResourceID, WorkerType> registeredWorkers;
    +
    +   /** List of listeners for info messages */
    +   private final Set<ActorRef> infoMessageListeners;
    +
    +   /** The JobManager that the framework master manages resources for */
    +   private ActorRef jobManager;
    +
    +   /** Our JobManager's leader session */
    +   private UUID leaderSessionID;
    +
    +   /** The size of the worker pool that the resource master strives to 
maintain */
    +   private int designatedPoolSize;
    +
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * Creates a AbstractFrameworkMaster actor.
    +    *
    +    * @param flinkConfig The Flink configuration object.
    +    */
    +   protected FlinkResourceManager(
    +                   int numInitialTaskManagers,
    +                   Configuration flinkConfig,
    +                   LeaderRetrievalService leaderRetriever) {
    +           this.config = requireNonNull(flinkConfig);
    +           this.leaderRetriever = requireNonNull(leaderRetriever);
    +           this.registeredWorkers = new HashMap<>();
    +
    +           FiniteDuration lt;
    +           try {
    +                   lt = AkkaUtils.getLookupTimeout(config);
    +           }
    +           catch (Exception e) {
    +                   lt = new FiniteDuration(
    +                           
Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(),
    +                           TimeUnit.MILLISECONDS);
    +           }
    +           this.messageTimeout = lt;
    +           this.designatedPoolSize = numInitialTaskManagers;
    +           this.infoMessageListeners = new HashSet<>();
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Actor Behavior
    +   // 
------------------------------------------------------------------------
    +
    +   @Override
    +   public void preStart() {
    +           // we start our leader retrieval service to make sure we get 
informed
    +           // about JobManager leader changes
    +           try {
    +                   leaderRetriever.start(new LeaderRetrievalListener() {
    +
    +                           @Override
    +                           public void notifyLeaderAddress(String 
leaderAddress, UUID leaderSessionID) {
    +                                   self().tell(
    +                                           new 
NewLeaderAvailable(leaderAddress, leaderSessionID),
    +                                           ActorRef.noSender());
    +                           }
    +
    +                           @Override
    +                           public void handleError(Exception e) {
    +                                   self().tell(
    +                                           new FatalErrorOccurred("Leader 
retrieval service failed", e),
    +                                           ActorRef.noSender());
    +                           }
    +                   });
    +           }
    +           catch (Throwable t) {
    +                   self().tell(
    +                           new FatalErrorOccurred("Could not start leader 
retrieval service", t),
    +                           ActorRef.noSender());
    +           }
    +           // framework specific initialization
    +           try {
    +                   initialize();
    --- End diff --
    
    It probably does not make much sense to initialize the resource manager if 
an error occurred while starting the leader retrieval service.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to