[ 
https://issues.apache.org/jira/browse/FLINK-1984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15422829#comment-15422829
 ] 

ASF GitHub Bot commented on FLINK-1984:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2315#discussion_r74949114
  
    --- Diff: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java
 ---
    @@ -0,0 +1,618 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.mesos.runtime.clusterframework;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +
    +import org.apache.curator.framework.CuratorFramework;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.configuration.IllegalConfigurationException;
    +import org.apache.flink.mesos.cli.FlinkMesosSessionCli;
    +import 
org.apache.flink.mesos.runtime.clusterframework.store.MesosWorkerStore;
    +import 
org.apache.flink.mesos.runtime.clusterframework.store.StandaloneMesosWorkerStore;
    +import 
org.apache.flink.mesos.runtime.clusterframework.store.ZooKeeperMesosWorkerStore;
    +import org.apache.flink.mesos.util.MesosArtifactServer;
    +import org.apache.flink.mesos.util.MesosConfiguration;
    +import org.apache.flink.mesos.util.ZooKeeperUtils;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.flink.runtime.jobmanager.MemoryArchivist;
    +import org.apache.flink.runtime.jobmanager.RecoveryMode;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.process.ProcessReaper;
    +import org.apache.flink.runtime.taskmanager.TaskManager;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.runtime.util.SignalHandler;
    +import org.apache.flink.runtime.webmonitor.WebMonitor;
    +
    +import org.apache.hadoop.security.UserGroupInformation;
    +
    +import org.apache.mesos.Protos;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import scala.Option;
    +import scala.concurrent.duration.Duration;
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.net.InetAddress;
    +import java.net.URL;
    +import java.security.PrivilegedAction;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +import static org.apache.flink.mesos.Utils.uri;
    +import static org.apache.flink.mesos.Utils.variable;
    +
    +/**
    + * This class is the executable entry point for the Mesos Application 
Master.
    + * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
    + * and {@link MesosFlinkResourceManager}.
    + *
    + * The JobManager handles Flink job execution, while the 
MesosFlinkResourceManager handles container
    + * allocation and failure detection.
    + */
    +public class MesosApplicationMasterRunner {
    +   /** Logger */
    +   protected static final Logger LOG = 
LoggerFactory.getLogger(MesosApplicationMasterRunner.class);
    +
    +   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
    +    * before they quit */
    +   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
    +
    +   /** The process environment variables */
    +   private static final Map<String, String> ENV = System.getenv();
    +
    +   /** The exit code returned if the initialization of the application 
master failed */
    +   private static final int INIT_ERROR_EXIT_CODE = 31;
    +
    +   /** The exit code returned if the process exits because a critical 
actor died */
    +   private static final int ACTOR_DIED_EXIT_CODE = 32;
    +
    +   // 
------------------------------------------------------------------------
    +   //  Program entry point
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * The entry point for the Mesos AppMaster.
    +    *
    +    * @param args The command line arguments.
    +    */
    +   public static void main(String[] args) {
    +           EnvironmentInformation.logEnvironmentInfo(LOG, "Mesos 
AppMaster", args);
    +           SignalHandler.register(LOG);
    +
    +           // run and exit with the proper return code
    +           int returnCode = new MesosApplicationMasterRunner().run(args);
    +           System.exit(returnCode);
    +   }
    +
    +   /**
    +    * The instance entry point for the Mesos AppMaster. Obtains user group
    +    * information and calls the main work method {@link #runPrivileged()} 
as a
    +    * privileged action.
    +    *
    +    * @param args The command line arguments.
    +    * @return The process exit code.
    +    */
    +   protected int run(String[] args) {
    +           try {
    +                   LOG.debug("All environment variables: {}", ENV);
    +
    +                   final UserGroupInformation currentUser;
    +                   try {
    +                           currentUser = 
UserGroupInformation.getCurrentUser();
    +                   } catch (Throwable t) {
    +                           throw new Exception("Cannot access 
UserGroupInformation information for current user", t);
    +                   }
    +
    +                   LOG.info("Running Flink as user {}", 
currentUser.getShortUserName());
    +
    +                   // run the actual work in a secured privileged action
    +                   return currentUser.doAs(new PrivilegedAction<Integer>() 
{
    +                           @Override
    +                           public Integer run() {
    +                                   return runPrivileged();
    +                           }
    +                   });
    +           }
    +           catch (Throwable t) {
    +                   // make sure that everything whatever ends up in the log
    +                   LOG.error("Mesos AppMaster initialization failed", t);
    +                   return INIT_ERROR_EXIT_CODE;
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Core work method
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * The main work method, must run as a privileged action.
    +    *
    +    * @return The return code for the Java process.
    +    */
    +   protected int runPrivileged() {
    +
    +           ActorSystem actorSystem = null;
    +           WebMonitor webMonitor = null;
    +           MesosArtifactServer artifactServer = null;
    +
    +           try {
    +                   // ------- (1) load and parse / validate all 
configurations -------
    +
    +                   // loading all config values here has the advantage 
that the program fails fast, if any
    +                   // configuration problem occurs
    +
    +                   final String workingDir = 
ENV.get(MesosConfigKeys.ENV_MESOS_SANDBOX);
    +                   require(workingDir != null, "Sandbox directory variable 
(%s) not set", MesosConfigKeys.ENV_MESOS_SANDBOX);
    +
    +                   final String sessionID = 
ENV.get(MesosConfigKeys.ENV_SESSION_ID);
    +                   require(sessionID != null, "Session ID (%s) not set", 
MesosConfigKeys.ENV_SESSION_ID);
    +
    +                   // Note that we use the "appMasterHostname" given by 
the system, to make sure
    +                   // we use the hostnames consistently throughout akka.
    +                   // for akka "localhost" and "localhost.localdomain" are 
different actors.
    +                   final String appMasterHostname = 
InetAddress.getLocalHost().getHostName();
    +
    +                   // Flink configuration
    +                   final Configuration dynamicProperties =
    +                           
FlinkMesosSessionCli.decodeDynamicProperties(ENV.get(MesosConfigKeys.ENV_DYNAMIC_PROPERTIES));
    +                   LOG.debug("Mesos dynamic properties: {}", 
dynamicProperties);
    +
    +                   final Configuration config = 
createConfiguration(workingDir, dynamicProperties);
    +
    +                   // Mesos configuration
    +                   final MesosConfiguration mesosConfig = 
createMesosConfig(config, appMasterHostname);
    +
    +                   // environment values related to TM
    +                   final int taskManagerContainerMemory;
    +                   final int numInitialTaskManagers;
    +                   final int slotsPerTaskManager;
    +
    +                   try {
    +                           taskManagerContainerMemory = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_MEMORY));
    +                   } catch (NumberFormatException e) {
    +                           throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_TM_MEMORY + " : "
    +                                   + e.getMessage());
    +                   }
    +                   try {
    +                           numInitialTaskManagers = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_TM_COUNT));
    +                   } catch (NumberFormatException e) {
    +                           throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_TM_COUNT + " : "
    +                                   + e.getMessage());
    +                   }
    +                   try {
    +                           slotsPerTaskManager = 
Integer.parseInt(ENV.get(MesosConfigKeys.ENV_SLOTS));
    +                   } catch (NumberFormatException e) {
    +                           throw new RuntimeException("Invalid value for " 
+ MesosConfigKeys.ENV_SLOTS + " : "
    +                                   + e.getMessage());
    +                   }
    +
    +                   final ContaineredTaskManagerParameters 
containeredParameters =
    +                           ContaineredTaskManagerParameters.create(config, 
taskManagerContainerMemory, slotsPerTaskManager);
    +
    +                   final MesosTaskManagerParameters taskManagerParameters =
    +                           MesosTaskManagerParameters.create(config, 
containeredParameters);
    +
    +                   LOG.info("TaskManagers will be created with {} task 
slots",
    +                           
taskManagerParameters.containeredParameters().numSlots());
    +                   LOG.info("TaskManagers will be started with container 
size {} MB, JVM heap size {} MB, " +
    +                                   "JVM direct memory limit {} MB, {} 
cpus",
    +                           
taskManagerParameters.containeredParameters().taskManagerTotalMemoryMB(),
    +                           
taskManagerParameters.containeredParameters().taskManagerHeapSizeMB(),
    +                           
taskManagerParameters.containeredParameters().taskManagerDirectMemoryLimitMB(),
    +                           taskManagerParameters.cpus());
    +
    +                   // JM endpoint, which should be explicitly configured 
by the dispatcher (based on acquired net resources)
    +                   final int listeningPort = 
config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY,
    +                           ConfigConstants.DEFAULT_JOB_MANAGER_IPC_PORT);
    +                   require(listeningPort >= 0 && listeningPort <= 65536, 
"Config parameter \"" +
    +                           ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + "\" 
is invalid, it must be between 0 and 65536");
    +
    +                   // ----------------- (2) start the actor system 
-------------------
    +
    +                   // try to start the actor system, JobManager and 
JobManager actor system
    +                   // using the configured address and ports
    +                   actorSystem = BootstrapTools.startActorSystem(config, 
appMasterHostname, listeningPort, LOG);
    +
    +                   final String akkaHostname = 
AkkaUtils.getAddress(actorSystem).host().get();
    +                   final int akkaPort = (Integer) 
AkkaUtils.getAddress(actorSystem).port().get();
    +
    +                   LOG.info("Actor system bound to hostname {}.", 
akkaHostname);
    +
    +                   // try to start the artifact server
    +                   LOG.debug("Starting Artifact Server");
    +                   final int artifactServerPort = 
config.getInteger(ConfigConstants.MESOS_ARTIFACT_SERVER_PORT_KEY,
    +                           
ConfigConstants.DEFAULT_MESOS_ARTIFACT_SERVER_PORT);
    +                   artifactServer = new MesosArtifactServer(sessionID, 
akkaHostname, artifactServerPort);
    +
    +                   // ----------------- (3) Generate the configuration for 
the TaskManagers -------------------
    +
    +                   final Configuration taskManagerConfig = 
BootstrapTools.generateTaskManagerConfiguration(
    +                           config, akkaHostname, akkaPort, 
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
    +                   LOG.debug("TaskManager configuration: {}", 
taskManagerConfig);
    +
    +                   final Protos.TaskInfo.Builder taskManagerContext = 
createTaskManagerContext(
    +                           config, mesosConfig, ENV,
    +                           taskManagerParameters, taskManagerConfig,
    +                           workingDir, getTaskManagerClass(), 
artifactServer, LOG);
    +
    +                   // ----------------- (4) start the actors 
-------------------
    +
    +                   // 1) JobManager & Archive (in non-HA case, the leader 
service takes this)
    +                   // 2) Web Monitor (we need its port to register)
    +                   // 3) Resource Master for Mesos
    +                   // 4) Process reapers for the JobManager and Resource 
Master
    +
    +                   // 1: the JobManager
    +                   LOG.debug("Starting JobManager actor");
    +
    +                   // we start the JobManager with its standard name
    +                   ActorRef jobManager = JobManager.startJobManagerActors(
    +                           config, actorSystem,
    +                           new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
    +                           scala.Option.<String>empty(),
    +                           getJobManagerClass(),
    +                           getArchivistClass())._1();
    +
    +
    +                   // 2: the web monitor
    +                   LOG.debug("Starting Web Frontend");
    +
    +                   webMonitor = 
BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, 
LOG);
    +                   if(webMonitor != null) {
    +                           final URL webMonitorURL = new URL("http", 
appMasterHostname, webMonitor.getServerPort(), "/");
    +                           
mesosConfig.frameworkInfo().setWebuiUrl(webMonitorURL.toExternalForm());
    +                   }
    +
    +                   // 3: Flink's Mesos ResourceManager
    +                   LOG.debug("Starting Mesos Flink Resource Manager");
    +
    +                   // create the worker store to persist task information 
across restarts
    +                   MesosWorkerStore workerStore = 
createWorkerStore(config);
    +
    +                   // we need the leader retrieval service here to be 
informed of new
    +                   // leader session IDs, even though there can be only 
one leader ever
    +                   LeaderRetrievalService leaderRetriever =
    +                           
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
    +
    +                   Props resourceMasterProps = 
MesosFlinkResourceManager.createActorProps(
    +                           getResourceManagerClass(),
    +                           config,
    +                           mesosConfig,
    +                           workerStore,
    +                           leaderRetriever,
    +                           taskManagerParameters,
    +                           taskManagerContext,
    +                           numInitialTaskManagers,
    +                           LOG);
    +
    +                   ActorRef resourceMaster = 
actorSystem.actorOf(resourceMasterProps, "Mesos_Resource_Master");
    +
    +
    +                   // 4: Process reapers
    +                   // The process reapers ensure that upon unexpected 
actor death, the process exits
    +                   // and does not stay lingering around unresponsive
    +
    +                   LOG.debug("Starting process reapers for JobManager");
    +
    +                   actorSystem.actorOf(
    +                           Props.create(ProcessReaper.class, 
resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
    +                           "Mesos_Resource_Master_Process_Reaper");
    +
    +                   actorSystem.actorOf(
    +                           Props.create(ProcessReaper.class, jobManager, 
LOG, ACTOR_DIED_EXIT_CODE),
    +                           "JobManager_Process_Reaper");
    +           }
    +           catch (Throwable t) {
    +                   // make sure that everything whatever ends up in the log
    +                   LOG.error("Mesos JobManager initialization failed", t);
    +
    +                   if (actorSystem != null) {
    +                           try {
    +                                   actorSystem.shutdown();
    +                           } catch (Throwable tt) {
    +                                   LOG.error("Error shutting down actor 
system", tt);
    +                           }
    +                   }
    +
    +                   if (webMonitor != null) {
    --- End diff --
    
    The web monitor depends on the actor system (it has some ActorRef). 
Consequently, it should be stopped before the actor system is stopped.


> Integrate Flink with Apache Mesos
> ---------------------------------
>
>                 Key: FLINK-1984
>                 URL: https://issues.apache.org/jira/browse/FLINK-1984
>             Project: Flink
>          Issue Type: New Feature
>          Components: Cluster Management
>            Reporter: Robert Metzger
>            Assignee: Eron Wright 
>            Priority: Minor
>         Attachments: 251.patch
>
>
> There are some users asking for an integration of Flink into Mesos.
> -There also is a pending pull request for adding Mesos support for Flink-: 
> https://github.com/apache/flink/pull/251
> Update (May '16):  a new effort is now underway, building on the recent 
> ResourceManager work.
> Design document:  ([google 
> doc|https://docs.google.com/document/d/1WItafBmGbjlaBbP8Of5PAFOH9GUJQxf5S4hjEuPchuU/edit?usp=sharing])



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to