Github user tillrohrmann commented on a diff in the pull request:
    --- Diff: 
    @@ -0,0 +1,796 @@
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.flink.runtime.clusterframework;
    +import akka.dispatch.OnComplete;
    +import akka.pattern.Patterns;
    +import akka.util.Timeout;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.akka.FlinkUntypedActor;
    +import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
    +import org.apache.flink.runtime.clusterframework.messages.RegisterResource;
    +import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
    +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
    +import org.apache.flink.runtime.clusterframework.messages.StopCluster;
    +import org.apache.flink.runtime.clusterframework.types.ResourceID;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.messages.RegistrationMessages;
    +import org.apache.flink.util.ExceptionUtils;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import scala.concurrent.Future;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +import java.util.ArrayList;
    +import java.util.Collection;
    +import java.util.HashMap;
    +import java.util.HashSet;
    +import java.util.List;
    +import java.util.Map;
    +import java.util.Set;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +import static java.util.Objects.requireNonNull;
    + *
    + * <h1>Worker allocation steps</h1>
    + *
    + * <ol>
    + *     <li>The resource manager decides to request more workers. This can 
happen in order
    + *         to fill the initial pool, or as a result of the JobManager 
requesting more workers.</li>
    + *
    + *     <li>The resource master calls {@link #requestNewWorkers(int)}, 
which triggers requests
    + *         for more containers. After that, the {@link 
    + *         should reflect the pending requests.</li>
    + *
    + *     <li>The concrete framework may acquire containers and then trigger 
to start TaskManagers
    + *         in those containers. That should be reflected in {@link 
    + *
    + *     <li>At some point, the TaskManager processes will have started and 
send a registration
    + *         message to the JobManager. The JobManager will perform
    + *         a lookup with the ResourceManager to check if it really started 
this TaskManager.
    + *         The method {@link #workerRegistered(ResourceID)} will be called
    + *         to inform about a registered worker.</li>
    + * </ol>
    + *
    + */
    +public abstract class FlinkResourceManager<WorkerType extends ResourceID> 
extends FlinkUntypedActor {
    +   /** The exit code with which the process is stopped in case of a fatal 
error */
    +   protected static final int EXIT_CODE_FATAL_ERROR = -13;
    +   /** The default name of the resource manager actor */
    +   public static final String RESOURCE_MANAGER_NAME = "resourcemanager";
    +   // 
    +   /** The logger, named for the actual implementing class */
    +   protected final Logger log = LoggerFactory.getLogger(getClass());
    +   /** The Flink configuration object */
    +   protected final Configuration config;
    +   /** The timeout for actor messages sent to the JobManager / 
TaskManagers */
    +   private final FiniteDuration messageTimeout;
    +   /** The service to find the right leader JobManager (to support high 
availability) */
    +   private final LeaderRetrievalService leaderRetriever;
    +   /** The currently registered resources */
    +   private final Map<ResourceID, WorkerType> registeredWorkers;
    +   /** List of listeners for info messages */
    +   private final Set<ActorRef> infoMessageListeners;
    +   /** The JobManager that the framework master manages resources for */
    +   private ActorRef jobManager;
    +   /** Our JobManager's leader session */
    +   private UUID leaderSessionID;
    +   /** The size of the worker pool that the resource master strives to 
maintain */
    +   private int designatedPoolSize;
    +   // 
    +   /**
    +    * Creates a AbstractFrameworkMaster actor.
    +    *
    +    * @param flinkConfig The Flink configuration object.
    +    */
    +   protected FlinkResourceManager(
    +                   int numInitialTaskManagers,
    +                   Configuration flinkConfig,
    +                   LeaderRetrievalService leaderRetriever) {
    +           this.config = requireNonNull(flinkConfig);
    +           this.leaderRetriever = requireNonNull(leaderRetriever);
    +           this.registeredWorkers = new HashMap<>();
    +           FiniteDuration lt;
    +           try {
    +                   lt = AkkaUtils.getLookupTimeout(config);
    +           }
    +           catch (Exception e) {
    +                   lt = new FiniteDuration(
    +                           TimeUnit.MILLISECONDS);
    +           }
    +           this.messageTimeout = lt;
    +           this.designatedPoolSize = numInitialTaskManagers;
    +           this.infoMessageListeners = new HashSet<>();
    +   }
    +   // 
    +   //  Actor Behavior
    +   // 
    +   @Override
    +   public void preStart() {
    +           try {
    +                   // we start our leader retrieval service to make sure 
we get informed
    +                   // about JobManager leader changes
    +                   leaderRetriever.start(new LeaderRetrievalListener() {
    +                           @Override
    +                           public void notifyLeaderAddress(String 
leaderAddress, UUID leaderSessionID) {
    +                                   self().tell(
    +                                           new 
NewLeaderAvailable(leaderAddress, leaderSessionID),
    +                                           ActorRef.noSender());
    +                           }
    +                           @Override
    +                           public void handleError(Exception e) {
    +                                   self().tell(
    +                                           new FatalErrorOccurred("Leader 
retrieval service failed", e),
    +                                           ActorRef.noSender());
    +                           }
    +                   });
    +                   // framework specific initialization
    +                   initialize();
    +           }
    +           catch (Throwable t) {
    +                   self().tell(
    +                           new FatalErrorOccurred("Error during startup of 
ResourceManager actor", t),
    +                           ActorRef.noSender());
    +           }
    +   }
    +   @Override
    +   public void postStop() {
    +           try {
    +                   leaderRetriever.stop();
    +           }
    +           catch (Throwable t) {
    +                   LOG.error("Could not cleanly shut down leader retrieval 
service", t);
    +           }
    +   }
    +   /**
    +    *
    +    * This method receives the actor messages after they have been 
filtered for
    +    * a match with the leader session.
    +    *
    +    * @param message The incoming actor message.
    +    */
    +   @Override
    +   protected void handleMessage(Object message) {
    +           try {
    +                   // --- messages about worker allocation and pool sizes
    +                   if (message instanceof CheckAndAllocateContainers) {
    +                           checkWorkersPool();
    +                   }
    +                   else if (message instanceof SetWorkerPoolSize) {
    +                           SetWorkerPoolSize msg = (SetWorkerPoolSize) 
    +                   }
    +                   else if (message instanceof RemoveResource) {
    +                           RemoveResource msg = (RemoveResource) message;
    +                           removeRegisteredResource(msg.resourceId());
    +                   }
    +                   // --- lookup of registered resources
    +                   else if (message instanceof RegisterResource) {
    +                           RegisterResource msg = (RegisterResource) 
    +                           handleRegisterResource(sender(), 
msg.getTaskManager(), msg.getRegisterMessage());
    +                   }
    +                   // --- messages about JobManager leader status and 
    +                   else if (message instanceof NewLeaderAvailable) {
    +                           NewLeaderAvailable msg = (NewLeaderAvailable) 
newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId());
    +                   }
    +                   else if (message instanceof 
TriggerRegistrationAtJobManager) {
    +                           TriggerRegistrationAtJobManager msg = 
(TriggerRegistrationAtJobManager) message;
    +                   }
    +                   else if (message instanceof 
RegisterResourceManagerSuccessful) {
    +                           RegisterResourceManagerSuccessful msg = 
(RegisterResourceManagerSuccessful) message;
    +                           jobManagerLeaderConnected(msg.jobManager(), 
    +                   }
    +                   // --- end of application
    +                   else if (message instanceof StopCluster) {
    +                           StopCluster msg = (StopCluster) message;
    +                           shutdownCluster(msg.finalStatus(), 
    +                   }
    +                   // --- miscellaneous messages
    +                   else if (message instanceof 
RegisterInfoMessageListener) {
    +                           if (jobManager != null) {
    +                                   infoMessageListeners.add(sender());
    +                                   sender().tell(decorateMessage(
    +                                           // answer as the JobManager
    +                                           jobManager);
    +                           }
    +                   }
    +                   else if (message instanceof 
UnRegisterInfoMessageListener) {
    +                           infoMessageListeners.remove(sender());
    +                   }
    +                   // --- unknown messages
    +                   else {
    +                           LOG.error("Discarding unknown message: {}", 
    +                   }
    +           }
    +           catch (Throwable t) {
    +                   // fatal error, needs master recovery
    +                   fatalError("Error processing actor message", t);
    +           }
    +   }
    +   @Override
    +   protected final UUID getLeaderSessionID() {
    +           return leaderSessionID;
    +   }
    +   // 
    +   //  Status
    +   // 
    +   /**
    +    * Gets the current designated worker pool size, meaning the number of 
    +    * that the resource master strives to maintain. The actual number of 
    +    * may be lower (if worker requests are still pending) or higher (if 
workers have
    +    * not yet been released).
    +    *
    +    * @return The designated worker pool size.
    +    */
    +   public int getDesignatedWorkerPoolSize() {
    +           return designatedPoolSize;
    +   }
    +   /**
    +    * Gets the number of currently registered TaskManagers.
    +    *
    +    * @return The number of currently registered TaskManagers.
    +    */
    +   public int getNumberOfRegisteredTaskManagers() {
    +           return registeredWorkers.size();
    +   }
    +   /**
    +    * Gets the currently registered resources.
    +    * @return
    +    */
    +   public Collection<WorkerType> getRegisteredTaskManagers() {
    +           return registeredWorkers.values();
    +   }
    +   /**
    +    * Gets the registered worker for a given resource ID, if one is 
    +    *
    +    * @param resourceId The resource ID for the worker.
    +    * @return True if already registered, otherwise false
    +    */
    +   public boolean isRegistered(ResourceID resourceId) {
    +           return registeredWorkers.containsKey(resourceId);
    +   }
    +   /**
    +    * Gets an iterable for all currently registered TaskManagers.
    +    *
    +    * @return All currently registered TaskManagers.
    +    */
    +   public Collection<WorkerType> allRegisteredWorkers() {
    +           return registeredWorkers.values();
    +   }
    +   /**
    +    * Register a resource on which a TaskManager has been started
    +    * @param jobManager The sender (JobManager) of the message
    +    * @param taskManager The task manager who wants to register
    +    * @param msg The task manager's registration message
    +    */
    +   private void handleRegisterResource(ActorRef jobManager, ActorRef 
    +                           RegistrationMessages.RegisterTaskManager msg) {
    +           ResourceID resourceID = msg.resourceId();
    +           try {
    +                   Preconditions.checkNotNull(resourceID);
    +                   WorkerType newWorker = 
    +                   WorkerType oldWorker = 
registeredWorkers.put(resourceID, newWorker);
    +                   if (oldWorker != null) {
    +                           LOG.warn("Worker {} had been registered 
before.", resourceID);
    +                   }
    +                   jobManager.tell(decorateMessage(
    +                           new RegisterResourceSuccessful(taskManager, 
    +                           self());
    +           } catch (Exception e) {
    +                   // This may happen on duplicate task manager 
registration message to the job manager
    +                   LOG.warn("TaskManager resource registration failed for 
{}", resourceID);
    +                   // tell the JobManager about the failure
    +                   String eStr = ExceptionUtils.stringifyException(e);
    +                   sender().tell(decorateMessage(
    +                           new RegisterResourceFailed(taskManager, 
resourceID, eStr)), self());
    +           }
    +   }
    +   /**
    +    * Releases the given resource. Note that this does not automatically 
    +    * the designated worker pool size.
    +    *
    +    * @param resourceId The TaskManager's resource id.
    +    */
    +   private void removeRegisteredResource(ResourceID resourceId) {
    +           WorkerType worker = registeredWorkers.remove(resourceId);
    +           if (worker != null) {
    +                   releaseRegisteredWorker(worker);
    +           } else {
    +                   LOG.warn("Resource {} could not be released", 
    +           }
    +   }
    +   // 
    +   //  Registration and consolidation with JobManager Leader
    +   // 
    +   /**
    +    * Called as soon as we discover (via leader election) that a 
JobManager lost leadership
    +    * or a different one gained leadership.
    +    *
    +    * @param leaderAddress The address (Akka URL) of the new leader. Null 
if there is currently no leader.
    +    * @param leaderSessionID The unique session ID marking the leadership 
    +    */
    +   protected void newJobManagerLeaderAvailable(String leaderAddress, UUID 
leaderSessionID) {
    +           log.debug("Received new leading JobManager {}. Connecting.", 
    +           // disconnect from the current leader (no-op if no leader yet)
    +           jobManagerLostLeadership();
    +           // a null leader address means that only a leader disconnect
    +           // happened, without a new leader yet
    +           if (leaderAddress != null) {
    +                   // the leaderSessionID implicitly filters out success 
and failure messages
    +                   // that come after leadership changed again
    +                   this.leaderSessionID = leaderSessionID;
    +                   triggerConnectingToJobManager(leaderAddress);
    +           }
    +   }
    +   /**
    +    * Causes the resource manager to announce itself at the new leader 
JobManager and
    +    * obtains its connection information and currently known TaskManagers.
    +    *
    +    * @param leaderAddress The akka actor URL of the new leader JobManager.
    +    */
    +   private void triggerConnectingToJobManager(String leaderAddress) {
    + "Trying to associate with JobManager leader " + 
    +           final Object registerMessage = decorateMessage(new 
    +           final Object retryMessage = decorateMessage(new 
    +           // send the registration message to the JobManager
    +           ActorSelection jobManagerSel = 
    +           Future<Object> future = Patterns.ask(jobManagerSel, 
registerMessage, new Timeout(messageTimeout));
    +           future.onComplete(new OnComplete<Object>() {
    +                   @Override
    +                   public void onComplete(Throwable failure, Object msg) {
    +                           if (msg != null) {
    +                                   if (msg instanceof LeaderSessionMessage 
    +                                           ((LeaderSessionMessage) 
msg).message() instanceof RegisterResourceManagerSuccessful)
    +                                   {
    +                                           self().tell(msg, 
    +                                   }
    +                                   else {
    +                                           LOG.error("Invalid response 
type to registration at JobManager: {}", msg);
    +                                           self().tell(retryMessage, 
    +                                   }
    +                           }
    +                           else {
    +                                   // no success
    +                                   LOG.error("Resource manager could not 
register at JobManager", failure);
    +                                   self().tell(retryMessage, 
    +                           }
    +                   }
    +           }, context().dispatcher());
    +   }
    +   /**
    +    * This method disassociates from the current leader JobManager. All 
currently registered
    +    * TaskManagers are put under "awaiting registration".
    +    */
    +   private void jobManagerLostLeadership() {
    +           if (jobManager != null) {
    +         "Associated JobManager {} lost leader status", 
    +                   jobManager = null;
    +                   leaderSessionID = null;
    +                   infoMessageListeners.clear();
    +                   registeredWorkers.clear();
    +           }
    +   }
    +   /**
    +    * Callback when we're informed about a new leading JobManager.
    +    * @param newJobManagerLeader The ActorRef of the new jobManager
    +    * @param workers The existing workers the JobManager has registered.
    +    */
    +   private void jobManagerLeaderConnected(
    +                                           ActorRef newJobManagerLeader,
    +                                           Collection<ResourceID> workers) 
    +           if (jobManager == null) {
    +         "Resource Manager associating with leading 
JobManager {} - leader session {}",
    +                                           newJobManagerLeader, 
    +                   jobManager = newJobManagerLeader;
    +                   // inform the framework that we have updated the leader
    +                   leaderUpdated();
    +                   if (workers.size() > 0) {
    +                 "Received TaskManagers that were 
registered at the leader JobManager. " +
    +                                           "Trying to consolidate.");
    +                           // keep track of which TaskManagers are not 
    +                           List<ResourceID> toHandle = new 
    +                           for (ResourceID resourceID : workers) {
    +                                   toHandle.add(resourceID);
    +                           }
    +                           try {
    +                                   // ask the framework to tell us which 
ones we should keep for now
    +                                   Collection<WorkerType> consolidated = 
    +                         "Consolidated {} 
TaskManagers", consolidated.size());
    +                                   // put the consolidated TaskManagers 
into our bookkeeping
    +                                   for (WorkerType worker : consolidated) {
    +                                           registeredWorkers.put(worker, 
    +                                           toHandle.remove(worker);
    +                                   }
    +                           }
    +                           catch (Throwable t) {
    +                                   LOG.error("Error during consolidation 
of known TaskManagers", t);
    +                                   // the framework should release the 
remaining unclear resources
    +                                   for (ResourceID id : toHandle) {
    +                                           releasePendingWorker(id);
    +                                   }
    +                           }
    +                   }
    +                   // trigger initial check for requesting new workers
    +                   checkWorkersPool();
    +           } else {
    +                   String msg = "Attempting to associate with new 
JobManager leader " + newJobManagerLeader
    +                           + " without previously disassociating from 
current leader " + jobManager;
    +                   fatalError(msg, new Exception(msg));
    +           }
    +   }
    +   // 
    +   //  Cluster Shutdown
    +   // 
    +   private void shutdownCluster(ApplicationStatus status, String 
diagnostics) {
    + "Shutting down cluster with status {} : {}", status, 
    +           shutdownApplication(status, diagnostics);
    +   }
    +   // 
    +   //  Worker pool size management
    +   // 
    +   /**
    +    * This method causes the resource framework master to 
    +    * the set of available and pending workers containers, and allocate 
    +    * if needed.
    +    *
    +    * This method does not automatically release workers, because it is 
not visible to
    +    * this resource master which workers can be released. Instead, the 
JobManager must
    +    * explicitly release individual workers.
    +    */
    +   private void checkWorkersPool() {
    +           int numWorkersPending = getNumWorkerRequestsPending();
    +           int numWorkersPendingRegistration = 
    +           // sanity checks
    +           Preconditions.checkState(numWorkersPending >= 0,
    +                   "Number of pending workers should never be below 0.");
    +           Preconditions.checkState(numWorkersPendingRegistration >= 0,
    +                   "Number of pending workers pending registration should 
never be below 0.");
    +           // see how many workers we want, and whether we have enough
    +           int allAvailableAndPending = registeredWorkers.size() +
    +                   numWorkersPending + numWorkersPendingRegistration;
    +           int missing = designatedPoolSize - allAvailableAndPending;
    +           if (missing > 0) {
    +                   requestNewWorkers(missing);
    +           }
    +   }
    +   /**
    +    * Sets the designated worker pool size. If this size is larger than 
the current pool
    +    * size, then the resource manager will try to acquire more 
    +    *
    +    * @param num The number of workers in the pool.
    +    */
    +   private void adjustDesignatedNumberOfWorkers(int num) {
    +           if (num >= 0) {
    +         "Adjusting designated worker pool size to {}", 
    +                   designatedPoolSize = num;
    +                   checkWorkersPool();
    +           } else {
    +                   log.warn("Ignoring invalid designated worker pool size: 
" + num);
    +           }
    +   }
    +   // 
    +   //  Callbacks
    +   // 
    +   /**
    +    * This method causes the resource framework master to 
    +    * the set of available and pending workers containers, and release or 
    +    * containers if needed. The method sends an actor message which will 
trigger the
    +    * re-examination.
    +    */
    +   public void triggerCheckWorkers() {
    +           self().tell(
    +                   decorateMessage(
    +                           CheckAndAllocateContainers.get()),
    +                   ActorRef.noSender());
    +   }
    +   /**
    +    * This method should be called by the framework once it detects that a 
currently registered
    +    * worker has failed.
    +    *
    +    * @param resourceID Id of the worker that has failed.
    +    * @param message An informational message that explains why the worker 
    +    */
    +   public void notifyWorkerFailed(ResourceID resourceID, String message) {
    +           WorkerType worker = registeredWorkers.remove(resourceID);
    +           if (worker != null) {
    +                   jobManager.tell(
    +                           decorateMessage(
    +                                   new ResourceRemoved(resourceID, 
    +                           self());
    +           }
    +   }
    +   // 
    +   //  Framework specific behavior
    +   // 
    +   /**
    +    * Initializes the framework specific components.
    +    *
    +    * @throws Exception Exceptions during initialization cause the 
resource manager to fail.
    +    *                   If the framework is able to recover this resource 
manager, it will be
    +    *                   restarted.
    +    */
    +   protected abstract void initialize() throws Exception;
    +   /**
    +    * Provides codes to handle an update of the leader (relevant for HA). 
The framework has to deal
    +    * with the consequences of a leader update.
    +    */
    +   protected abstract void leaderUpdated();
    +   /**
    +    * The framework specific code for shutting down the application. This 
should report the
    +    * application's final status and shut down the resource manager 
    +    *
    +    * This method also needs to make sure all pending containers that are 
not registered
    +    * yet are returned.
    +    *
    +    * @param finalStatus The application status to report.
    +    * @param optionalDiagnostics An optional diagnostics message.
    +    */
    +   protected abstract void shutdownApplication(ApplicationStatus 
finalStatus, String optionalDiagnostics);
    +   /**
    +    * Notifies the resource master of a fatal error.
    +    *
    +    * <p><b>IMPORTANT:</b> This should not cleanly shut down this master, 
but exit it in
    +    * such a way that a high-availability setting would restart this or 
fail over
    +    * to another master.
    +    */
    +   protected abstract void fatalError(String message, Throwable error);
    +   /**
    +    * Requests to allocate a certain number of new workers.
    +    *
    +    * @param numWorkers The number of workers to allocate.
    +    */
    +   protected abstract void requestNewWorkers(int numWorkers);
    +   /**
    +    * Trigger a release of a pending worker.
    +    * @param resourceID The worker resource id
    +    */
    +   protected abstract void releasePendingWorker(ResourceID resourceID);
    +   /**
    +    * Trigger a release of a registered worker.
    +    * @param resourceID The worker resource id
    +    */
    +   protected abstract void releaseRegisteredWorker(WorkerType resourceID);
    +   /**
    +    * Callback when a worker was registered.
    +    * @param resourceID The worker resource id
    +    */
    +   protected abstract WorkerType workerRegistered(ResourceID resourceID) 
throws Exception;
    +   /**
    +    * This method is called when the resource manager starts after a 
failure and reconnects to
    +    * the leader JobManager, who still has some workers registered. The 
method is used to consolidate
    +    * the view between resource manager and JobManager. The resource 
manager gets the list of TaskManagers
    +    * that the JobManager considers available and should return a list or 
nodes that the
    +    * resource manager considers available.
    +    *
    +    * After that, the JobManager is informed of loss of all TaskManagers 
that are not part of the
    +    * returned list.
    +    *
    +    * It is possible that the resource manager initially confirms some 
TaskManagers to be alive, even
    +    * through they are in an uncertain status, if it later sends necessary 
failure notifications
    +    * via calling {@link #notifyWorkerFailed(ResourceID, String)}.
    +    *
    +    * @param registered The list of TaskManagers that the JobManager knows.
    +    * @return The subset of TaskManagers that the resource manager can 
confirm to be alive.
    +    */
    +   protected abstract Collection<WorkerType> 
reacceptRegisteredWorkers(Collection<ResourceID> registered);
    +   /**
    +    * Gets the number of requested workers that have not yet been granted.
    +    *
    +    * @return The number pending worker requests. Must never be smaller 
than 0.
    +    */
    +   protected abstract int getNumWorkerRequestsPending();
    +   /**
    +    * Gets the number of containers that have been started, but where the 
    +    * has not yet registered at the job manager.
    +    *
    +    * @return The number of started containers pending TaskManager 
    +    * Must never be smaller than 0.
    +    */
    +   protected abstract int getNumWorkersPendingRegistration();
    +   // 
    +   //  Info messaging
    +   // 
    +   protected void sendInfoMessage(String message) {
    +           for (ActorRef listener : infoMessageListeners) {
    +                   listener.tell(decorateMessage(new 
InfoMessage(message)), self());
    +           }
    +   }
    +   // 
    +   //  Startup
    +   // 
    +   public static void startResourceManager() {
    +   }
    +   /**
    +    * Starts the resource manager actors.
    +    * @param configuration The configuration for the resource manager
    +    * @param actorSystem The actor system to start the resource manager in
    +    * @param leaderRetriever The leader retriever service to intialize the 
resource manager
    +    * @param resourceManagerClass The class of the ResourceManager to be 
    +    * @return ActorRef of the resource manager
    +    */
    +   public static ActorRef startResourceManagerActors(
    +           Configuration configuration,
    +           ActorSystem actorSystem,
    +           LeaderRetrievalService leaderRetriever,
    +           Class<? extends FlinkResourceManager<?>> resourceManagerClass
    +   ) {
    +           return startResourceManagerActors(
    +                   configuration, actorSystem, leaderRetriever, 
    +                   RESOURCE_MANAGER_NAME + "-" + UUID.randomUUID());
    +   }
    +   /**
    +    * Starts the resource manager actors.
    +    * @param configuration The configuration for the resource manager
    +    * @param actorSystem The actor system to start the resource manager in
    +    * @param leaderRetriever The leader retriever service to intialize the 
resource manager
    +    * @param resourceManagerClass The class of the ResourceManager to be 
    +    * @param resourceManagerActorName The name of the resource manager 
    +    * @return ActorRef of the resource manager
    +    */
    +   public static ActorRef startResourceManagerActors(
    +           Configuration configuration,
    +           ActorSystem actorSystem,
    +           LeaderRetrievalService leaderRetriever,
    +           Class<? extends FlinkResourceManager<?>> resourceManagerClass,
    +           String resourceManagerActorName
    +   ) {
    --- End diff --
    Same here

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

Reply via email to