[ 
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206575#comment-15206575
 ] 

ASF GitHub Bot commented on FLINK-3544:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/1741#discussion_r57008853
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java 
---
    @@ -0,0 +1,601 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.yarn;
    +
    +import akka.actor.ActorRef;
    +import akka.actor.ActorSystem;
    +import akka.actor.Props;
    +
    +import org.apache.flink.client.CliFrontend;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.GlobalConfiguration;
    +import org.apache.flink.runtime.akka.AkkaUtils;
    +import org.apache.flink.runtime.clusterframework.BootstrapTools;
    +import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
    +import org.apache.flink.runtime.jobmanager.JobManager;
    +import org.apache.flink.runtime.jobmanager.MemoryArchivist;
    +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
    +import org.apache.flink.runtime.process.ProcessReaper;
    +import org.apache.flink.runtime.taskmanager.TaskManager;
    +import org.apache.flink.runtime.util.EnvironmentInformation;
    +import org.apache.flink.runtime.util.LeaderRetrievalUtils;
    +import org.apache.flink.runtime.util.SignalHandler;
    +import org.apache.flink.runtime.webmonitor.WebMonitor;
    +
    +import org.apache.hadoop.fs.Path;
    +import org.apache.hadoop.io.DataOutputBuffer;
    +import org.apache.hadoop.security.Credentials;
    +import org.apache.hadoop.security.UserGroupInformation;
    +import org.apache.hadoop.security.token.Token;
    +import org.apache.hadoop.yarn.api.ApplicationConstants;
    +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
    +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
    +import org.apache.hadoop.yarn.api.records.LocalResource;
    +import org.apache.hadoop.yarn.conf.YarnConfiguration;
    +import org.apache.hadoop.yarn.util.Records;
    +
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import scala.concurrent.duration.FiniteDuration;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.nio.ByteBuffer;
    +import java.security.PrivilegedAction;
    +import java.util.Collections;
    +import java.util.HashMap;
    +import java.util.Map;
    +import java.util.UUID;
    +import java.util.concurrent.TimeUnit;
    +
    +/**
    + * This class is the executable entry point for the YARN application 
master.
    + * It starts actor system and the actors for {@link 
org.apache.flink.runtime.jobmanager.JobManager}
    + * and {@link YarnFlinkResourceManager}.
    + * 
    + * The JobManager handles Flink job execution, while the 
YarnFlinkResourceManager handles container
    + * allocation and failure detection.
    + */
    +public class YarnApplicationMasterRunner {
    +
    +   /** Logger */
    +   protected static final Logger LOG = 
LoggerFactory.getLogger(YarnApplicationMasterRunner.class);
    +
    +   /** The maximum time that TaskManagers may be waiting to register at 
the JobManager,
    +    * before they quit */
    +   private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = 
new FiniteDuration(5, TimeUnit.MINUTES);
    +   
    +   /** The process environment variables */
    +   private static final Map<String, String> ENV = System.getenv();
    +   
    +   /** The exit code returned if the initialization of the application 
master failed */
    +   private static final int INIT_ERROR_EXIT_CODE = 31;
    +
    +   /** The exit code returned if the process exits because a critical 
actor died */
    +   private static final int ACTOR_DIED_EXIT_CODE = 32;
    +
    +
    +   // 
------------------------------------------------------------------------
    +   //  Program entry point
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * The entry point for the YARN application master. 
    +    *
    +    * @param args The command line arguments.
    +    */
    +   public static void main(String[] args) {
    +           EnvironmentInformation.logEnvironmentInfo(LOG, "YARN 
ApplicationMaster / JobManager", args);
    +           SignalHandler.register(LOG);
    +
    +           // run and exit with the proper return code
    +           int returnCode = new YarnApplicationMasterRunner().run(args);
    +           System.exit(returnCode);
    +   }
    +   
    +   /**
    +    * The instance entry point for the YARN application master. Obtains 
user group
    +    * information and calls the main work method {@link 
#runApplicationMaster()} as a
    +    * privileged action.
    +    *
    +    * @param args The command line arguments.
    +    * @return The process exit code.
    +    */
    +   protected int run(String[] args) {
    +           try {
    +                   LOG.debug("All environment variables: {}", ENV);
    +                   
    +                   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME);
    +                   require(yarnClientUsername != null, "YARN client user 
name environment variable {} not set",
    +                           YarnConfigKeys.ENV_CLIENT_USERNAME);
    +           
    +                   final UserGroupInformation currentUser;
    +                   try {
    +                           currentUser = 
UserGroupInformation.getCurrentUser();
    +                   } catch (Throwable t) {
    +                           throw new Exception("Cannot access 
UserGroupInformation information for current user", t);
    +                   }
    +           
    +                   LOG.info("YARN daemon runs as user {}. Running Flink 
Application Master/JobManager as user {}",
    +                           currentUser.getShortUserName(), 
yarnClientUsername);
    +   
    +                   UserGroupInformation ugi = 
UserGroupInformation.createRemoteUser(yarnClientUsername);
    +                   
    +                   // transfer all security tokens, for example for 
authenticated HDFS and HBase access
    +                   for (Token<?> token : currentUser.getTokens()) {
    +                           ugi.addToken(token);
    +                   }
    +   
    +                   // run the actual work in a secured privileged action
    +                   return ugi.doAs(new PrivilegedAction<Integer>() {
    +                           @Override
    +                           public Integer run() {
    +                                   return runApplicationMaster();
    +                           }
    +                   });
    +           }
    +           catch (Throwable t) {
    +                   // make sure that everything whatever ends up in the log
    +                   LOG.error("YARN Application Master initialization 
failed", t);
    +                   return INIT_ERROR_EXIT_CODE;
    +           }
    +   }
    +
    +   // 
------------------------------------------------------------------------
    +   //  Core work method
    +   // 
------------------------------------------------------------------------
    +
    +   /**
    +    * The main work method, must run as a privileged action.
    +    * 
    +    * @return The return code for the Java process. 
    +    */
    +   protected int runApplicationMaster() {
    +           ActorSystem actorSystem = null;
    +           WebMonitor webMonitor = null;
    +           
    +           try {
    +                   // ------- (1) load and parse / validate all 
configurations -------
    +                   
    +                   // loading all config values here has the advantage 
that the program fails fast, if any
    +                   // configuration problem occurs
    +                   
    +                   final String currDir = ENV.get(Environment.PWD.key());
    +                   require(currDir != null, "Current working directory 
variable (%s) not set", Environment.PWD.key());
    +   
    +                   // Note that we use the "appMasterHostname" given by 
YARN here, to make sure
    +                   // we use the hostnames given by YARN consistently 
throughout akka.
    +                   // for akka "localhost" and "localhost.localdomain" are 
different actors.
    +                   final String appMasterHostname = 
ENV.get(Environment.NM_HOST.key());
    +                   require(appMasterHostname != null,
    +                           "ApplicationMaster hostname variable %s not 
set", Environment.NM_HOST.key());
    +   
    +                   LOG.info("YARN assigned hostname for application 
master: {}", appMasterHostname);
    +                   
    +                   // Flink configuration
    +                   final Map<String, String> dynamicProperties =
    +                           
CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES));
    +                   LOG.debug("YARN dynamic properties: {}", 
dynamicProperties);
    +                   
    +                   final Configuration config = 
createConfiguration(currDir, dynamicProperties);
    +                   
    +                   // Hadoop/Yarn configuration (loads config data 
automatically from classpath files)
    +                   final YarnConfiguration yarnConfig = new 
YarnConfiguration();
    +                   
    +                   final int taskManagerContainerMemory;
    +                   final int numInitialTaskManagers;
    +                   final int slotsPerTaskManager;
    +
    +                   try {
    +                           taskManagerContainerMemory = 
Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY));
    +                   } catch (NumberFormatException e) {
    +                           throw new RuntimeException("Invalid value for " 
+ YarnConfigKeys.ENV_TM_MEMORY + " : "
    +                                   + e.getMessage());
    +                   }
    +                   try {
    +                           numInitialTaskManagers = 
Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT));
    +                   } catch (NumberFormatException e) {
    +                           throw new RuntimeException("Invalid value for " 
+ YarnConfigKeys.ENV_TM_COUNT + " : "
    +                                   + e.getMessage());
    +                   }
    +                   try {
    +                           slotsPerTaskManager = 
Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS));
    +                   } catch (NumberFormatException e) {
    +                           throw new RuntimeException("Invalid value for " 
+ YarnConfigKeys.ENV_SLOTS + " : "
    +                                   + e.getMessage());
    +                   }
    +                   
    +                   final ContaineredTaskManagerParameters 
taskManagerParameters =
    +                           ContaineredTaskManagerParameters.create(config, 
taskManagerContainerMemory, slotsPerTaskManager);
    +
    +                   LOG.info("TaskManagers will be created with {} task 
slots", taskManagerParameters.numSlots());
    +                   LOG.info("TaskManagers will be started with container 
size {} MB, JVM heap size {} MB, " +
    +                           "JVM direct memory limit {} MB",
    +                           
taskManagerParameters.taskManagerTotalMemoryMB(),
    +                           taskManagerParameters.taskManagerHeapSizeMB(),
    +                           
taskManagerParameters.taskManagerDirectMemoryLimitMB());
    +                   
    +                   
    +                   // ----------------- (2) start the actor system 
-------------------
    +                   
    +                   // try to start the actor system, JobManager and 
JobManager actor system
    +                   // using the port range definition from the config.
    +                   final String amPortRange = config.getString(
    +                                   
ConfigConstants.YARN_APPLICATION_MASTER_PORT,
    +                                   
ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT);
    +                   
    +                   actorSystem = BootstrapTools.startActorSystem(config, 
appMasterHostname, amPortRange, LOG);
    +                   
    +                   final String akkaHostname = 
AkkaUtils.getAddress(actorSystem).host().get();
    +                   final int akkaPort = (Integer) 
AkkaUtils.getAddress(actorSystem).port().get();
    +
    +                   LOG.info("Actor system bound to hostname {}.", 
akkaHostname);
    +                   
    +                   
    +                   // ---- (3) Generate the configuration for the 
TaskManagers
    +                   
    +                   final Configuration taskManagerConfig = 
BootstrapTools.generateTaskManagerConfiguration(
    +                                   config, akkaHostname, akkaPort, 
slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT);
    +                   LOG.debug("TaskManager configuration: {}", 
taskManagerConfig);
    +                   
    +                   final ContainerLaunchContext taskManagerContext = 
createTaskManagerContext(
    +                           config, yarnConfig, ENV,
    +                           taskManagerParameters, taskManagerConfig,
    +                           currDir, getTaskManagerClass(), LOG);
    +                   
    +                   
    +                   // ---- (4) start the actors and components in this 
order:
    +                   
    +                   // 1) JobManager & Archive (in non-HA case, the leader 
service takes this)
    +                   // 2) Web Monitor (we need its port to register)
    +                   // 3) Resource Master for YARN
    +                   // 4) Process reapers for the JobManager and Resource 
Master
    +
    +                   
    +                   // 1: the JobManager
    +                   LOG.debug("Starting JobManager actor");
    +
    +                   // we start the JobManager with its standard name
    +                   ActorRef jobManager = JobManager.startJobManagerActors(
    +                           config, actorSystem,
    +                           new scala.Some<>(JobManager.JOB_MANAGER_NAME()),
    +                           scala.Option.<String>empty(),
    +                           getJobManagerClass(),
    +                           getArchivistClass())._1();
    +
    +
    +                   // 2: the web monitor
    +                   LOG.debug("Starting Web Frontend");
    +
    +                   webMonitor = 
BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, 
LOG);
    +                   final String webMonitorURL = webMonitor == null ? null :
    +                           "http://"; + appMasterHostname + ":" + 
webMonitor.getServerPort();
    +
    +                   // 3: Flink's Yarn resource manager
    +                   LOG.debug("Starting YARN Flink Resource Manager");
    +
    +                   // we need the leader retrieval service here to be 
informed of new
    +                   // leader session IDs, even though there can be only 
one leader ever
    +                   LeaderRetrievalService leaderRetriever = 
    +                           
LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager);
    +                   
    +                   Props resourceMasterProps = 
YarnFlinkResourceManager.createActorProps(
    +                           getResourceManagerClass(),
    +                           config,
    +                           yarnConfig,
    +                           leaderRetriever,
    +                           appMasterHostname,
    +                           webMonitorURL,
    +                           taskManagerParameters,
    +                           taskManagerContext,
    +                           numInitialTaskManagers, 
    +                           LOG);
    +                   
    +                   ActorRef resourceMaster = 
actorSystem.actorOf(resourceMasterProps);
    +                   
    +                   
    +                   // 4: Process reapers
    +                   // The process reapers ensure that upon unexpected 
actor death, the process exits
    +                   // and does not stay lingering around unresponsive
    +                   
    +                   LOG.debug("Starting process reapers for JobManager and 
YARN Application Master");
    +
    +                   actorSystem.actorOf(
    +                           Props.create(ProcessReaper.class, 
resourceMaster, LOG, ACTOR_DIED_EXIT_CODE),
    +                           "YARN_Resource_Master_Process_Reaper");
    +                   
    +                   actorSystem.actorOf(
    +                           Props.create(ProcessReaper.class, jobManager, 
LOG, ACTOR_DIED_EXIT_CODE),
    +                           "JobManager_Process_Reaper");
    +           }
    +           catch (Throwable t) {
    +                   // make sure that everything whatever ends up in the log
    +                   LOG.error("YARN Application Master initialization 
failed", t);
    +                   
    +                   if (actorSystem != null) {
    +                           try {
    +                                   actorSystem.shutdown();
    +                           } catch (Throwable tt) {
    +                                   LOG.error("Error shutting down actor 
system", tt);
    +                           }
    +                   }
    +
    +                   if (webMonitor != null) {
    +                           try {
    +                                   webMonitor.stop();
    +                           } catch (Throwable ignored) {}
    --- End diff --
    
    Would be maybe good to log the error.


> ResourceManager runtime components
> ----------------------------------
>
>                 Key: FLINK-3544
>                 URL: https://issues.apache.org/jira/browse/FLINK-3544
>             Project: Flink
>          Issue Type: Sub-task
>          Components: ResourceManager
>    Affects Versions: 1.1.0
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>             Fix For: 1.1.0
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to