[ https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422832#comment-15422832 ]
ASF GitHub Bot commented on FLINK-1984: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2315#discussion_r74949669 --- Diff: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java --- @@ -0,0 +1,618 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.mesos.runtime.clusterframework; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.IllegalConfigurationException; +import org.apache.flink.mesos.cli.FlinkMesosSessionCli; +import org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore; +import org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore; +import org.apache.flink.mesos.util.MesosArtifactServer; +import org.apache.flink.mesos.util.MesosConfiguration; +import org.apache.flink.mesos.util.ZooKeeperUtils; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.jobmanager.RecoveryMode; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.security.UserGroupInformation; + +import org.apache.mesos.Protos; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Option; +import scala.concurrent.duration.Duration; +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.net.InetAddress; +import java.net.URL; +import java.security.PrivilegedAction; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.flink.mesos.Utils.uri; +import static org.apache.flink.mesos.Utils.variable; + +/** + * This class is the executable entry point for the Mesos Application Master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link MesosFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the MesosFlinkResourceManager handles container + * allocation and failure detection. + */ +public class MesosApplicationMasterRunner { + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(MesosApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, + * before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map<String, String> ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the Mesos AppMaster. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos AppMaster", args); + SignalHandler.register(LOG); + + // run and exit with the proper return code + int returnCode = new MesosApplicationMasterRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the Mesos AppMaster. Obtains user group + * information and calls the main work method {@link #runPrivileged()} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final UserGroupInformation currentUser; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (Throwable t) { + throw new Exception("Cannot access UserGroupInformation information for current user", t); + } + + LOG.info("Running Flink as user {}", currentUser.getShortUserName()); + + // run the actual work in a secured privileged action + return currentUser.doAs(new PrivilegedAction<Integer>() { + @Override + public Integer run() { + return runPrivileged(); + } + }); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("Mesos AppMaster initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected int runPrivileged() { + + ActorSystem actorSystem = null; + WebMonitor webMonitor = null; + MesosArtifactServer artifactServer = null; + + try { + // ------- (1) load and parse / validate all configurations ------- + + // loading all config values here has the advantage that the program fails fast, if any + // configuration problem occurs + + final String workingDir = ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX); + require(workingDir != null, "Sandbox directory variable (%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX); + + final String sessionID = ENV.get(MesosConfigKeys.ENV_SESSION_ID); + require(sessionID != null, "Session ID (%s) not set", MesosConfigKeys.ENV_SESSION_ID); + + // Note that we use the "appMasterHostname" given by the system, to make sure + // we use the hostnames consistently throughout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + final String appMasterHostname = InetAddress.getLocalHost().getHostName(); + + // Flink configuration + final Configuration dynamicProperties = + FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("Mesos dynamic properties: {}", dynamicProperties); + + final Configuration config = createConfiguration(workingDir, dynamicProperties); + + // Mesos configuration + final MesosConfiguration mesosConfig = createMesosConfig(config, appMasterHostname); + + // environment values related to TM + final int taskManagerContainerMemory; + final int numInitialTaskManagers; + final int slotsPerTaskManager; + + try { + taskManagerContainerMemory = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_MEMORY + " : " + + e.getMessage()); + } + try { + numInitialTaskManagers = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_TM_COUNT + " : " + + e.getMessage()); + } + try { + slotsPerTaskManager = Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + MesosConfigKeys.ENV_SLOTS + " : " + + e.getMessage()); + } + + final ContaineredTaskManagerParameters containeredParameters = + ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager); + + final MesosTaskManagerParameters taskManagerParameters = + MesosTaskManagerParameters.create(config, containeredParameters); + + LOG.info("TaskManagers will be created with {} task slots", + taskManagerParameters.containeredParameters().numSlots()); + LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + + "JVM direct memory limit {} MB, {} cpus", + taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(), + taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(), + taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(), + taskManagerParameters.cpus()); + + // JM endpoint, which should be explicitly configured by the dispatcher (based on acquired net resources) + final int listeningPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, + ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT); + require(listeningPort >= 0 && listeningPort <= 65536, "Config parameter \"" + + ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" is invalid, it must be between 0 and 65536"); + + // ----------------- (2) start the actor system ------------------- + + // try to start the actor system, JobManager and JobManager actor system + // using the configured address and ports + actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, listeningPort, LOG); + + final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get(); + final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get(); + + LOG.info("Actor system bound to hostname {}.", akkaHostname); + + // try to start the artifact server + LOG.debug("Starting Artifact Server"); + final int artifactServerPort = config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY, + ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT); + artifactServer = new MesosArtifactServer(sessionID, akkaHostname, artifactServerPort); + + // ----------------- (3) Generate the configuration for the TaskManagers ------------------- + + final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( + config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); + LOG.debug("TaskManager configuration: {}", taskManagerConfig); + + final Protos.TaskInfo.Builder taskManagerContext = createTaskManagerContext( + config, mesosConfig, ENV, + taskManagerParameters, taskManagerConfig, + workingDir, getTaskManagerClass(), artifactServer, LOG); + + // ----------------- (4) start the actors ------------------- + + // 1) JobManager & Archive (in non-HA case, the leader service takes this) + // 2) Web Monitor (we need its port to register) + // 3) Resource Master for Mesos + // 4) Process reapers for the JobManager and Resource Master + + // 1: the JobManager + LOG.debug("Starting JobManager actor"); + + // we start the JobManager with its standard name + ActorRef jobManager = JobManager.startJobManagerActors( + config, actorSystem, + new scala.Some<>(JobManager.JOB_MANAGER_NAME()), + scala.Option.<String>empty(), + getJobManagerClass(), + getArchivistClass())._1(); + + + // 2: the web monitor + LOG.debug("Starting Web Frontend"); + + webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG); + if(webMonitor != null) { + final URL webMonitorURL = new URL("http", appMasterHostname, webMonitor.getServerPort(), "/"); + mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm()); + } + + // 3: Flink's Mesos ResourceManager + LOG.debug("Starting Mesos Flink Resource Manager"); + + // create the worker store to persist task information across restarts + MesosWorkerStore workerStore = createWorkerStore(config); + + // we need the leader retrieval service here to be informed of new + // leader session IDs, even though there can be only one leader ever + LeaderRetrievalService leaderRetriever = + LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager); + + Props resourceMasterProps = MesosFlinkResourceManager.createActorProps( + getResourceManagerClass(), + config, + mesosConfig, + workerStore, + leaderRetriever, + taskManagerParameters, + taskManagerContext, + numInitialTaskManagers, + LOG); + + ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master"); + + + // 4: Process reapers + // The process reapers ensure that upon unexpected actor death, the process exits + // and does not stay lingering around unresponsive + + LOG.debug("Starting process reapers for JobManager"); + + actorSystem.actorOf( + Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE), + "Mesos_Resource_Master_Process_Reaper"); + + actorSystem.actorOf( + Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE), + "JobManager_Process_Reaper"); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("Mesos JobManager initialization failed", t); + + if (actorSystem != null) { + try { + actorSystem.shutdown(); + } catch (Throwable tt) { + LOG.error("Error shutting down actor system", tt); + } + } + + if (webMonitor != null) { + try { + webMonitor.stop(); + } catch (Throwable ignored) { + LOG.warn("Failed to stop the web frontend", ignored); + } + } + + if(artifactServer != null) { + try { + artifactServer.stop(); + } catch (Throwable ignored) { + LOG.error("Failed to stop the artifact server", ignored); + } + } + + return INIT_ERROR_EXIT_CODE; + } + + // everything started, we can wait until all is done or the process is killed + LOG.info("Mesos JobManager started"); + + // wait until everything is done + actorSystem.awaitTermination(); + + // if we get here, everything work out jolly all right, and we even exited smoothly + if (webMonitor != null) { + try { + webMonitor.stop(); + } catch (Throwable t) { + LOG.error("Failed to stop the web frontend", t); + } + } + + try { + artifactServer.stop(); + } catch (Throwable t) { + LOG.error("Failed to stop the artifact server", t); + } + + return 0; + } + + // ------------------------------------------------------------------------ + // For testing, this allows to override the actor classes used for + // JobManager and the archive of completed jobs + // ------------------------------------------------------------------------ + + protected Class<? extends MesosFlinkResourceManager> getResourceManagerClass() { + return MesosFlinkResourceManager.class; + } + + protected Class<? extends JobManager> getJobManagerClass() { + return MesosJobManager.class; + } + + protected Class<? extends MemoryArchivist> getArchivistClass() { + return MemoryArchivist.class; + } + + protected Class<? extends TaskManager> getTaskManagerClass() { + return MesosTaskManager.class; + } + + /** + * Validates a condition, throwing a RuntimeException if the condition is violated. + * + * @param condition The condition. + * @param message The message for the runtime exception, with format variables as defined by + * {@link String#format(String, Object...)}. + * @param values The format arguments. + */ + private static void require(boolean condition, String message, Object... values) { + if (!condition) { + throw new RuntimeException(String.format(message, values)); + } + } --- End diff -- We could replace this via `o.a.f.util.Preconditions.checkArgument` or `checkState`. > Integrate Flink with Apache Mesos > --------------------------------- > > Key: FLINK-1984 > URL: https://issues.apache.org/jira/browse/FLINK-1984 > Project: Flink > Issue Type: New Feature > Components: Cluster Management > Reporter: Robert Metzger > Assignee: Eron Wright > Priority: Minor > Attachments: 251.patch > > > There are some users asking for an integration of Flink into Mesos. > -There also is a pending pull request for adding Mesos support for Flink-: > https://github.com/apache/flink/pull/251 > Update (May '16): a new effort is now underway, building on the recent > ResourceManager work. > Design document: ([google > doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing]) -- This message was sent by Atlassian JIRA (v6.3.4#6332)