[ https://issues.apache.org/jira/browse/FLINK-4928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15690837#comment-15690837 ]
ASF GitHub Bot commented on FLINK-4928: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2744#discussion_r88944228 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkApplicationMasterRunner.java --- @@ -0,0 +1,612 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.yarn; + +import akka.actor.ActorSystem; +import org.apache.flink.api.common.JobExecutionResult; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.BootstrapTools; +import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobmanager.OnCompletionActions; +import org.apache.flink.runtime.jobmaster.JobManagerServices; +import org.apache.flink.runtime.jobmaster.JobMaster; +import org.apache.flink.runtime.leaderelection.LeaderContender; +import org.apache.flink.runtime.leaderelection.LeaderElectionService; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.exceptions.ConfigurationException; +import org.apache.flink.runtime.resourcemanager.slotmanager.DefaultSlotManager; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerFactory; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.apache.flink.runtime.rpc.akka.AkkaRpcService; +import org.apache.flink.runtime.security.SecurityContext; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.yarn.cli.FlinkYarnSessionCli; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.concurrent.duration.FiniteDuration; + +import javax.annotation.concurrent.GuardedBy; + +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.ObjectInputStream; +import java.util.Map; +import java.util.UUID; + +/** + * This class is the executable entry point for the YARN application master. + * It starts actor system and the actors for {@link org.apache.flink.runtime.jobmaster.JobMaster} + * and {@link org.apache.flink.yarn.YarnResourceManager}. + * + * The JobMasters handles Flink job execution, while the YarnResourceManager handles container + * allocation and failure detection. + */ +public class YarnFlinkApplicationMasterRunner implements LeaderContender, OnCompletionActions, FatalErrorHandler { + + /** Logger */ + protected static final Logger LOG = LoggerFactory.getLogger(YarnFlinkApplicationMasterRunner.class); + + /** The process environment variables */ + private static final Map<String, String> ENV = System.getenv(); + + /** The exit code returned if the initialization of the application master failed */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + /** The job graph file path */ + private static final String JOB_GRAPH_FILE_PATH = "flink.jobgraph.path"; + + /** The lock to guard startup / shutdown / manipulation methods */ + private final Object lock = new Object(); + + @GuardedBy("lock") + private MetricRegistry metricRegistry; + + @GuardedBy("lock") + private HighAvailabilityServices haServices; + + @GuardedBy("lock") + private LeaderElectionService jmLeaderElectionService; + + @GuardedBy("lock") + private RpcService jobMasterRpcService; + + @GuardedBy("lock") + private RpcService resourceManagerRpcService; + + @GuardedBy("lock") + private ResourceManager resourceManager; + + @GuardedBy("lock") + private JobMaster jobMaster; + + @GuardedBy("lock") + JobManagerServices jobManagerServices; + + @GuardedBy("lock") + JobManagerMetricGroup jobManagerMetrics; + + @GuardedBy("lock") + private JobGraph jobGraph; + + /** Flag marking the app master runner as started/running */ + private volatile boolean running; + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN application master. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN ApplicationMaster runner", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + // run and exit with the proper return code + int returnCode = new YarnFlinkApplicationMasterRunner().run(args); + System.exit(returnCode); + } + + /** + * The instance entry point for the YARN application master. Obtains user group + * information and calls the main work method {@link #runApplicationMaster(org.apache.flink.configuration.Configuration)} as a + * privileged action. + * + * @param args The command line arguments. + * @return The process exit code. + */ + protected int run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + require(yarnClientUsername != null, "YARN client user name environment variable {} not set", + YarnConfigKeys.ENV_HADOOP_USER_NAME); + + final String currDir = ENV.get(Environment.PWD.key()); + require(currDir != null, "Current working directory variable (%s) not set", Environment.PWD.key()); + LOG.debug("Current working Directory: {}", currDir); + + final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); + LOG.debug("remoteKeytabPath obtained {}", remoteKeytabPath); --- End diff -- I think it would be better to use non camel case here and simply spell it out since this is a logging statement which is user facing. > Implement FLIP-6 YARN Application Master Runner > ----------------------------------------------- > > Key: FLINK-4928 > URL: https://issues.apache.org/jira/browse/FLINK-4928 > Project: Flink > Issue Type: Sub-task > Components: YARN > Environment: {{flip-6}} feature branch > Reporter: Stephan Ewen > Assignee: shuai.xu > > The Application Master Runner is the master process started in a YARN > container when submitting the Flink-on-YARN job to YARN. > It has the following data available: > - Flink jars > - Job jars > - JobGraph > - Environment variables > - Contextual information like security tokens and certificates > Its responsibility is the following: > - Read all configuration and environment variables, computing the effective > configuration > - Start all shared components (Rpc, HighAvailability Services) > - Start the ResourceManager > - Start the JobManager Runner -- This message was sent by Atlassian JIRA (v6.3.4#6332)