[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15199280#comment-15199280 ]
ASF GitHub Bot commented on FLINK-3544: --------------------------------------- Github user mxm commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r56483788 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java --- @@ -0,0 +1,796 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSelection; +import akka.actor.ActorSystem; +import akka.actor.Props; +import akka.dispatch.OnComplete; +import akka.pattern.Patterns; +import akka.util.Timeout; + +import com.google.common.base.Preconditions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.akka.FlinkUntypedActor; +import org.apache.flink.runtime.clusterframework.messages.CheckAndAllocateContainers; +import org.apache.flink.runtime.clusterframework.messages.FatalErrorOccurred; +import org.apache.flink.runtime.clusterframework.messages.InfoMessage; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListenerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResource; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceFailed; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceSuccessful; +import org.apache.flink.runtime.clusterframework.messages.NewLeaderAvailable; +import org.apache.flink.runtime.clusterframework.messages.RegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager; +import org.apache.flink.runtime.clusterframework.messages.RemoveResource; +import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved; +import org.apache.flink.runtime.clusterframework.messages.SetWorkerPoolSize; +import org.apache.flink.runtime.clusterframework.messages.StopCluster; +import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager; +import org.apache.flink.runtime.clusterframework.messages.UnRegisterInfoMessageListener; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage; + +import org.apache.flink.runtime.messages.RegistrationMessages; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static java.util.Objects.requireNonNull; + +/** + * + * <h1>Worker allocation steps</h1> + * + * <ol> + * <li>The resource manager decides to request more workers. This can happen in order + * to fill the initial pool, or as a result of the JobManager requesting more workers.</li> + * + * <li>The resource master calls {@link #requestNewWorkers(int)}, which triggers requests + * for more containers. After that, the {@link #getNumWorkerRequestsPending()} + * should reflect the pending requests.</li> + * + * <li>The concrete framework may acquire containers and then trigger to start TaskManagers + * in those containers. That should be reflected in {@link #getNumWorkersPendingRegistration()}.</li> + * + * <li>At some point, the TaskManager processes will have started and send a registration + * message to the JobManager. The JobManager will perform + * a lookup with the ResourceManager to check if it really started this TaskManager. + * The method {@link #workerRegistered(ResourceID)} will be called + * to inform about a registered worker.</li> + * </ol> + * + */ +public abstract class FlinkResourceManager<WorkerType extends ResourceID> extends FlinkUntypedActor { + + /** The exit code with which the process is stopped in case of a fatal error */ + protected static final int EXIT_CODE_FATAL_ERROR = -13; + + /** The default name of the resource manager actor */ + public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; + + // ------------------------------------------------------------------------ + + /** The logger, named for the actual implementing class */ + protected final Logger log = LoggerFactory.getLogger(getClass()); + + /** The Flink configuration object */ + protected final Configuration config; + + /** The timeout for actor messages sent to the JobManager / TaskManagers */ + private final FiniteDuration messageTimeout; + + /** The service to find the right leader JobManager (to support high availability) */ + private final LeaderRetrievalService leaderRetriever; + + /** The currently registered resources */ + private final Map<ResourceID, WorkerType> registeredWorkers; + + /** List of listeners for info messages */ + private final Set<ActorRef> infoMessageListeners; + + /** The JobManager that the framework master manages resources for */ + private ActorRef jobManager; + + /** Our JobManager's leader session */ + private UUID leaderSessionID; + + /** The size of the worker pool that the resource master strives to maintain */ + private int designatedPoolSize; + + // ------------------------------------------------------------------------ + + /** + * Creates a AbstractFrameworkMaster actor. + * + * @param flinkConfig The Flink configuration object. + */ + protected FlinkResourceManager( + int numInitialTaskManagers, + Configuration flinkConfig, + LeaderRetrievalService leaderRetriever) { + this.config = requireNonNull(flinkConfig); + this.leaderRetriever = requireNonNull(leaderRetriever); + this.registeredWorkers = new HashMap<>(); + + FiniteDuration lt; + try { + lt = AkkaUtils.getLookupTimeout(config); + } + catch (Exception e) { + lt = new FiniteDuration( + Duration.apply(ConfigConstants.DEFAULT_AKKA_LOOKUP_TIMEOUT).toMillis(), + TimeUnit.MILLISECONDS); + } + this.messageTimeout = lt; + this.designatedPoolSize = numInitialTaskManagers; + this.infoMessageListeners = new HashSet<>(); + } + + // ------------------------------------------------------------------------ + // Actor Behavior + // ------------------------------------------------------------------------ + + @Override + public void preStart() { + try { + // we start our leader retrieval service to make sure we get informed + // about JobManager leader changes + leaderRetriever.start(new LeaderRetrievalListener() { + + @Override + public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) { + self().tell( + new NewLeaderAvailable(leaderAddress, leaderSessionID), + ActorRef.noSender()); + } + + @Override + public void handleError(Exception e) { + self().tell( + new FatalErrorOccurred("Leader retrieval service failed", e), + ActorRef.noSender()); + } + }); + + // framework specific initialization + initialize(); + + } + catch (Throwable t) { + self().tell( + new FatalErrorOccurred("Error during startup of ResourceManager actor", t), + ActorRef.noSender()); + } + } + + @Override + public void postStop() { + try { + leaderRetriever.stop(); + } + catch (Throwable t) { + LOG.error("Could not cleanly shut down leader retrieval service", t); + } + } + + /** + * + * This method receives the actor messages after they have been filtered for + * a match with the leader session. + * + * @param message The incoming actor message. + */ + @Override + protected void handleMessage(Object message) { + try { + // --- messages about worker allocation and pool sizes + + if (message instanceof CheckAndAllocateContainers) { + checkWorkersPool(); + } + else if (message instanceof SetWorkerPoolSize) { + SetWorkerPoolSize msg = (SetWorkerPoolSize) message; + adjustDesignatedNumberOfWorkers(msg.numberOfWorkers()); + } + else if (message instanceof RemoveResource) { + RemoveResource msg = (RemoveResource) message; + removeRegisteredResource(msg.resourceId()); + } + + // --- lookup of registered resources + + else if (message instanceof RegisterResource) { + RegisterResource msg = (RegisterResource) message; + handleRegisterResource(sender(), msg.getTaskManager(), msg.getRegisterMessage()); + } + + // --- messages about JobManager leader status and registration + + else if (message instanceof NewLeaderAvailable) { + NewLeaderAvailable msg = (NewLeaderAvailable) message; + newJobManagerLeaderAvailable(msg.leaderAddress(), msg.leaderSessionId()); + } + else if (message instanceof TriggerRegistrationAtJobManager) { + TriggerRegistrationAtJobManager msg = (TriggerRegistrationAtJobManager) message; + triggerConnectingToJobManager(msg.jobManagerAddress()); + } + else if (message instanceof RegisterResourceManagerSuccessful) { + RegisterResourceManagerSuccessful msg = (RegisterResourceManagerSuccessful) message; + jobManagerLeaderConnected(msg.jobManager(), msg.currentlyRegisteredTaskManagers()); + } + + // --- end of application + + else if (message instanceof StopCluster) { + StopCluster msg = (StopCluster) message; + shutdownCluster(msg.finalStatus(), msg.message()); + } + + // --- miscellaneous messages + + else if (message instanceof RegisterInfoMessageListener) { + if (jobManager != null) { + infoMessageListeners.add(sender()); + sender().tell(decorateMessage( + RegisterInfoMessageListenerSuccessful.get()), + // answer as the JobManager + jobManager); + } + } + + else if (message instanceof UnRegisterInfoMessageListener) { + infoMessageListeners.remove(sender()); + } + + // --- unknown messages + + else { + LOG.error("Discarding unknown message: {}", message); + } + } + catch (Throwable t) { + // fatal error, needs master recovery + fatalError("Error processing actor message", t); + } + } + + @Override + protected final UUID getLeaderSessionID() { + return leaderSessionID; + } + + // ------------------------------------------------------------------------ + // Status + // ------------------------------------------------------------------------ + + /** + * Gets the current designated worker pool size, meaning the number of workers + * that the resource master strives to maintain. The actual number of workers + * may be lower (if worker requests are still pending) or higher (if workers have + * not yet been released). + * + * @return The designated worker pool size. + */ + public int getDesignatedWorkerPoolSize() { + return designatedPoolSize; + } + + /** + * Gets the number of currently registered TaskManagers. + * + * @return The number of currently registered TaskManagers. + */ + public int getNumberOfRegisteredTaskManagers() { + return registeredWorkers.size(); + } + + /** + * Gets the currently registered resources. + * @return + */ + public Collection<WorkerType> getRegisteredTaskManagers() { + return registeredWorkers.values(); + } + + /** + * Gets the registered worker for a given resource ID, if one is available. + * + * @param resourceId The resource ID for the worker. + * @return True if already registered, otherwise false + */ + public boolean isRegistered(ResourceID resourceId) { + return registeredWorkers.containsKey(resourceId); + } + + /** + * Gets an iterable for all currently registered TaskManagers. + * + * @return All currently registered TaskManagers. + */ + public Collection<WorkerType> allRegisteredWorkers() { + return registeredWorkers.values(); + } + + /** + * Register a resource on which a TaskManager has been started + * @param jobManager The sender (JobManager) of the message + * @param taskManager The task manager who wants to register + * @param msg The task manager's registration message + */ + private void handleRegisterResource(ActorRef jobManager, ActorRef taskManager, + RegistrationMessages.RegisterTaskManager msg) { + + ResourceID resourceID = msg.resourceId(); + try { + Preconditions.checkNotNull(resourceID); + WorkerType newWorker = workerRegistered(msg.resourceId()); + WorkerType oldWorker = registeredWorkers.put(resourceID, newWorker); + if (oldWorker != null) { + LOG.warn("Worker {} had been registered before.", resourceID); + } + jobManager.tell(decorateMessage( + new RegisterResourceSuccessful(taskManager, msg)), + self()); + } catch (Exception e) { + // This may happen on duplicate task manager registration message to the job manager + LOG.warn("TaskManager resource registration failed for {}", resourceID); + + // tell the JobManager about the failure + String eStr = ExceptionUtils.stringifyException(e); + sender().tell(decorateMessage( + new RegisterResourceFailed(taskManager, resourceID, eStr)), self()); + } + } + + /** + * Releases the given resource. Note that this does not automatically shrink + * the designated worker pool size. + * + * @param resourceId The TaskManager's resource id. + */ + private void removeRegisteredResource(ResourceID resourceId) { + + WorkerType worker = registeredWorkers.remove(resourceId); + if (worker != null) { + releaseRegisteredWorker(worker); + } else { + LOG.warn("Resource {} could not be released", resourceId); + } + } + + + // ------------------------------------------------------------------------ + // Registration and consolidation with JobManager Leader + // ------------------------------------------------------------------------ + + /** + * Called as soon as we discover (via leader election) that a JobManager lost leadership + * or a different one gained leadership. + * + * @param leaderAddress The address (Akka URL) of the new leader. Null if there is currently no leader. + * @param leaderSessionID The unique session ID marking the leadership session. + */ + protected void newJobManagerLeaderAvailable(String leaderAddress, UUID leaderSessionID) { + log.debug("Received new leading JobManager {}. Connecting.", leaderAddress); + + // disconnect from the current leader (no-op if no leader yet) + jobManagerLostLeadership(); + + // a null leader address means that only a leader disconnect + // happened, without a new leader yet + if (leaderAddress != null) { + // the leaderSessionID implicitly filters out success and failure messages + // that come after leadership changed again + this.leaderSessionID = leaderSessionID; + triggerConnectingToJobManager(leaderAddress); + } + } + + /** + * Causes the resource manager to announce itself at the new leader JobManager and + * obtains its connection information and currently known TaskManagers. + * + * @param leaderAddress The akka actor URL of the new leader JobManager. + */ + private void triggerConnectingToJobManager(String leaderAddress) { + LOG.info("Trying to associate with JobManager leader " + leaderAddress); + + final Object registerMessage = decorateMessage(new RegisterResourceManager(self())); + final Object retryMessage = decorateMessage(new TriggerRegistrationAtJobManager(leaderAddress)); + + // send the registration message to the JobManager + ActorSelection jobManagerSel = context().actorSelection(leaderAddress); + Future<Object> future = Patterns.ask(jobManagerSel, registerMessage, new Timeout(messageTimeout)); + + future.onComplete(new OnComplete<Object>() { + + @Override + public void onComplete(Throwable failure, Object msg) { + if (msg != null) { + if (msg instanceof LeaderSessionMessage && + ((LeaderSessionMessage) msg).message() instanceof RegisterResourceManagerSuccessful) + { + self().tell(msg, ActorRef.noSender()); + } + else { + LOG.error("Invalid response type to registration at JobManager: {}", msg); + self().tell(retryMessage, ActorRef.noSender()); + } + } + else { + // no success + LOG.error("Resource manager could not register at JobManager", failure); + self().tell(retryMessage, ActorRef.noSender()); + } + } + + }, context().dispatcher()); + } + + /** + * This method disassociates from the current leader JobManager. All currently registered + * TaskManagers are put under "awaiting registration". + */ + private void jobManagerLostLeadership() { + if (jobManager != null) { + LOG.info("Associated JobManager {} lost leader status", jobManager); + + jobManager = null; + leaderSessionID = null; + + infoMessageListeners.clear(); + + registeredWorkers.clear(); + } + } + + /** + * Callback when we're informed about a new leading JobManager. + * @param newJobManagerLeader The ActorRef of the new jobManager + * @param workers The existing workers the JobManager has registered. + */ + private void jobManagerLeaderConnected( + ActorRef newJobManagerLeader, + Collection<ResourceID> workers) { + + if (jobManager == null) { + LOG.info("Resource Manager associating with leading JobManager {} - leader session {}", + newJobManagerLeader, leaderSessionID); + + jobManager = newJobManagerLeader; + + // inform the framework that we have updated the leader + leaderUpdated(); + + if (workers.size() > 0) { + LOG.info("Received TaskManagers that were registered at the leader JobManager. " + + "Trying to consolidate."); + + // keep track of which TaskManagers are not handled + List<ResourceID> toHandle = new ArrayList<>(workers.size()); + for (ResourceID resourceID : workers) { + toHandle.add(resourceID); + } --- End diff -- I didn't see where the `add` method grows the ArrayList in this case where the list is already correctly sized. > ResourceManager runtime components > ---------------------------------- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Affects Versions: 1.1.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)