[ https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206575#comment-15206575 ]
ASF GitHub Bot commented on FLINK-3544: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/1741#discussion_r57008853 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java --- @@ -0,0 +1,601 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorRef; +import akka.actor.ActorSystem; +import akka.actor.Props; + +import org.apache.flink.client.CliFrontend; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmanager.MemoryArchivist; +import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; +import org.apache.flink.runtime.process.ProcessReaper; +import org.apache.flink.runtime.taskmanager.TaskManager; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.LeaderRetrievalUtils; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.runtime.webmonitor.WebMonitor; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.Records; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.concurrent.duration.FiniteDuration; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.security.PrivilegedAction; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmanager.JobManager} + * and {@link YarnFlinkResourceManager}. + * + * The JobManager handles Flink job execution, while the YarnFlinkResourceManager handles container + * allocation and failure detection. + */ +public class YarnApplicationMasterRunner { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnApplicationMasterRunner.class); + + /** The maximum time that TaskManagers may be waiting to register at the JobManager, + * before they quit */ + private static final FiniteDuration TASKMANAGER_REGISTRATION_TIMEOUT = new FiniteDuration(5, TimeUnit.MINUTES); + + /** The process environment variables */ + private static final Map<String, String> ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The exit code returned if the process exits because a critical actor died */ + private static final int ACTOR_DIED_EXIT_CODE = 32; + + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN application master. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster / JobManager", args); + SignalHandler.register(LOG); + + // run and exit with the proper return code + int returnCode = new YarnApplicationMasterRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the YARN application master. Obtains user group + * information and calls the main work method {@link #runApplicationMaster()} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_CLIENT_USERNAME); + require(yarnClientUsername != null, "YARN client user name environment variable {} not set", + YarnConfigKeys.ENV_CLIENT_USERNAME); + + final UserGroupInformation currentUser; + try { + currentUser = UserGroupInformation.getCurrentUser(); + } catch (Throwable t) { + throw new Exception("Cannot access UserGroupInformation information for current user", t); + } + + LOG.info("YARN daemon runs as user {}. Running Flink Application Master/JobManager as user {}", + currentUser.getShortUserName(), yarnClientUsername); + + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(yarnClientUsername); + + // transfer all security tokens, for example for authenticated HDFS and HBase access + for (Token<?> token : currentUser.getTokens()) { + ugi.addToken(token); + } + + // run the actual work in a secured privileged action + return ugi.doAs(new PrivilegedAction<Integer>() { + @Override + public Integer run() { + return runApplicationMaster(); + } + }); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + return INIT_ERROR_EXIT_CODE; + } + } + + // ------------------------------------------------------------------------ + // Core work method + // ------------------------------------------------------------------------ + + /** + * The main work method, must run as a privileged action. + * + * @return The return code for the Java process. + */ + protected int runApplicationMaster() { + ActorSystem actorSystem = null; + WebMonitor webMonitor = null; + + try { + // ------- (1) load and parse / validate all configurations ------- + + // loading all config values here has the advantage that the program fails fast, if any + // configuration problem occurs + + final String currDir = ENV.get(Environment.PWD.key()); + require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key()); + + // Note that we use the "appMasterHostname" given by YARN here, to make sure + // we use the hostnames given by YARN consistently throughout akka. + // for akka "localhost" and "localhost.localdomain" are different actors. + final String appMasterHostname = ENV.get(Environment.NM_HOST.key()); + require(appMasterHostname != null, + "ApplicationMaster hostname variable %s not set", Environment.NM_HOST.key()); + + LOG.info("YARN assigned hostname for application master: {}", appMasterHostname); + + // Flink configuration + final Map<String, String> dynamicProperties = + CliFrontend.getDynamicProperties(ENV.get(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES)); + LOG.debug("YARN dynamic properties: {}", dynamicProperties); + + final Configuration config = createConfiguration(currDir, dynamicProperties); + + // Hadoop/Yarn configuration (loads config data automatically from classpath files) + final YarnConfiguration yarnConfig = new YarnConfiguration(); + + final int taskManagerContainerMemory; + final int numInitialTaskManagers; + final int slotsPerTaskManager; + + try { + taskManagerContainerMemory = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_MEMORY)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_MEMORY + " : " + + e.getMessage()); + } + try { + numInitialTaskManagers = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_TM_COUNT)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_TM_COUNT + " : " + + e.getMessage()); + } + try { + slotsPerTaskManager = Integer.parseInt(ENV.get(YarnConfigKeys.ENV_SLOTS)); + } catch (NumberFormatException e) { + throw new RuntimeException("Invalid value for " + YarnConfigKeys.ENV_SLOTS + " : " + + e.getMessage()); + } + + final ContaineredTaskManagerParameters taskManagerParameters = + ContaineredTaskManagerParameters.create(config, taskManagerContainerMemory, slotsPerTaskManager); + + LOG.info("TaskManagers will be created with {} task slots", taskManagerParameters.numSlots()); + LOG.info("TaskManagers will be started with container size {} MB, JVM heap size {} MB, " + + "JVM direct memory limit {} MB", + taskManagerParameters.taskManagerTotalMemoryMB(), + taskManagerParameters.taskManagerHeapSizeMB(), + taskManagerParameters.taskManagerDirectMemoryLimitMB()); + + + // ----------------- (2) start the actor system ------------------- + + // try to start the actor system, JobManager and JobManager actor system + // using the port range definition from the config. + final String amPortRange = config.getString( + ConfigConstants.YARN_APPLICATION_MASTER_PORT, + ConfigConstants.DEFAULT_YARN_JOB_MANAGER_PORT); + + actorSystem = BootstrapTools.startActorSystem(config, appMasterHostname, amPortRange, LOG); + + final String akkaHostname = AkkaUtils.getAddress(actorSystem).host().get(); + final int akkaPort = (Integer) AkkaUtils.getAddress(actorSystem).port().get(); + + LOG.info("Actor system bound to hostname {}.", akkaHostname); + + + // ---- (3) Generate the configuration for the TaskManagers + + final Configuration taskManagerConfig = BootstrapTools.generateTaskManagerConfiguration( + config, akkaHostname, akkaPort, slotsPerTaskManager, TASKMANAGER_REGISTRATION_TIMEOUT); + LOG.debug("TaskManager configuration: {}", taskManagerConfig); + + final ContainerLaunchContext taskManagerContext = createTaskManagerContext( + config, yarnConfig, ENV, + taskManagerParameters, taskManagerConfig, + currDir, getTaskManagerClass(), LOG); + + + // ---- (4) start the actors and components in this order: + + // 1) JobManager & Archive (in non-HA case, the leader service takes this) + // 2) Web Monitor (we need its port to register) + // 3) Resource Master for YARN + // 4) Process reapers for the JobManager and Resource Master + + + // 1: the JobManager + LOG.debug("Starting JobManager actor"); + + // we start the JobManager with its standard name + ActorRef jobManager = JobManager.startJobManagerActors( + config, actorSystem, + new scala.Some<>(JobManager.JOB_MANAGER_NAME()), + scala.Option.<String>empty(), + getJobManagerClass(), + getArchivistClass())._1(); + + + // 2: the web monitor + LOG.debug("Starting Web Frontend"); + + webMonitor = BootstrapTools.startWebMonitorIfConfigured(config, actorSystem, jobManager, LOG); + final String webMonitorURL = webMonitor == null ? null : + "http://" + appMasterHostname + ":" + webMonitor.getServerPort(); + + // 3: Flink's Yarn resource manager + LOG.debug("Starting YARN Flink Resource Manager"); + + // we need the leader retrieval service here to be informed of new + // leader session IDs, even though there can be only one leader ever + LeaderRetrievalService leaderRetriever = + LeaderRetrievalUtils.createLeaderRetrievalService(config, jobManager); + + Props resourceMasterProps = YarnFlinkResourceManager.createActorProps( + getResourceManagerClass(), + config, + yarnConfig, + leaderRetriever, + appMasterHostname, + webMonitorURL, + taskManagerParameters, + taskManagerContext, + numInitialTaskManagers, + LOG); + + ActorRef resourceMaster = actorSystem.actorOf(resourceMasterProps); + + + // 4: Process reapers + // The process reapers ensure that upon unexpected actor death, the process exits + // and does not stay lingering around unresponsive + + LOG.debug("Starting process reapers for JobManager and YARN Application Master"); + + actorSystem.actorOf( + Props.create(ProcessReaper.class, resourceMaster, LOG, ACTOR_DIED_EXIT_CODE), + "YARN_Resource_Master_Process_Reaper"); + + actorSystem.actorOf( + Props.create(ProcessReaper.class, jobManager, LOG, ACTOR_DIED_EXIT_CODE), + "JobManager_Process_Reaper"); + } + catch (Throwable t) { + // make sure that everything whatever ends up in the log + LOG.error("YARN Application Master initialization failed", t); + + if (actorSystem != null) { + try { + actorSystem.shutdown(); + } catch (Throwable tt) { + LOG.error("Error shutting down actor system", tt); + } + } + + if (webMonitor != null) { + try { + webMonitor.stop(); + } catch (Throwable ignored) {} --- End diff -- Would be maybe good to log the error. > ResourceManager runtime components > ---------------------------------- > > Key: FLINK-3544 > URL: https://issues.apache.org/jira/browse/FLINK-3544 > Project: Flink > Issue Type: Sub-task > Components: ResourceManager > Affects Versions: 1.1.0 > Reporter: Maximilian Michels > Assignee: Maximilian Michels > Fix For: 1.1.0 > > -- This message was sent by Atlassian JIRA (v6.3.4#6332)