[ https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458784#comment-16458784 ]
ASF GitHub Bot commented on FLINK-8286: --------------------------------------- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/5896#discussion_r185052704 --- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java --- @@ -73,61 +105,68 @@ public static void main(String[] args) { SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); + try { + SecurityUtils.getInstalledContext().runSecured( + YarnTaskExecutorRunnerFactory.create(System.getenv())); + } catch (Exception e) { + LOG.error("Exception occurred while launching Task Executor runner", e); + throw new RuntimeException(e); + } } /** - * The instance entry point for the YARN task executor. Obtains user group information and calls - * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a - * privileged action. + * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}. * - * @param args The command line arguments. + * @param envs environment variables. */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + @VisibleForTesting + protected static Runner create(Map<String, String> envs) { + LOG.debug("All environment variables: {}", envs); - final String yarnClientUsername = ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); - final String localDirs = ENV.get(Environment.LOCAL_DIRS.key()); - LOG.info("Current working/local Directory: {}", localDirs); + final String yarnClientUsername = envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME); + final String localDirs = envs.get(Environment.LOCAL_DIRS.key()); + LOG.info("Current working/local Directory: {}", localDirs); - final String currDir = ENV.get(Environment.PWD.key()); - LOG.info("Current working Directory: {}", currDir); + final String currDir = envs.get(Environment.PWD.key()); + LOG.info("Current working Directory: {}", currDir); - final String remoteKeytabPath = ENV.get(YarnConfigKeys.KEYTAB_PATH); - LOG.info("TM: remote keytab path obtained {}", remoteKeytabPath); + final String remoteKeytabPrincipal = envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL); + LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - final String remoteKeytabPrincipal = ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL); - LOG.info("TM: remote keytab principal obtained {}", remoteKeytabPrincipal); - - final Configuration configuration = GlobalConfiguration.loadConfiguration(currDir); + final Configuration configuration; + try { + configuration = GlobalConfiguration.loadConfiguration(currDir); FileSystem.initialize(configuration); + } catch (Throwable t) { + LOG.error(t.getMessage(), t); + return null; + } - // configure local directory - if (configuration.contains(CoreOptions.TMP_DIRS)) { - LOG.info("Overriding YARN's temporary file directories with those " + - "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); - } - else { - LOG.info("Setting directories for temporary files to: {}", localDirs); - configuration.setString(CoreOptions.TMP_DIRS, localDirs); - } - - // tell akka to die in case of an error - configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + // configure local directory + if (configuration.contains(CoreOptions.TMP_DIRS)) { + LOG.info("Overriding YARN's temporary file directories with those " + + "specified in the Flink config: " + configuration.getValue(CoreOptions.TMP_DIRS)); + } + else { + LOG.info("Setting directories for temporary files to: {}", localDirs); + configuration.setString(CoreOptions.TMP_DIRS, localDirs); + } - String keytabPath = null; - if (remoteKeytabPath != null) { - File f = new File(currDir, Utils.KEYTAB_FILE_NAME); - keytabPath = f.getAbsolutePath(); - LOG.info("keytab path: {}", keytabPath); - } + // tell akka to die in case of an error + configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true); + try { UserGroupInformation currentUser = UserGroupInformation.getCurrentUser(); LOG.info("YARN daemon is running as: {} Yarn client user obtainer: {}", currentUser.getShortUserName(), yarnClientUsername); + File f = new File(currDir, Utils.KEYTAB_FILE_NAME); --- End diff -- Is the only change really that we always do this instead of having the check on `remoteKeytabPath`, as the old code had? The old code had this on line 120: ``` if (remoteKeytabPath != null) { File f = new File(currDir, Utils.KEYTAB_FILE_NAME); keytabPath = f.getAbsolutePath(); LOG.info("keytab path: {}", keytabPath); } ``` > Fix Flink-Yarn-Kerberos integration for FLIP-6 > ---------------------------------------------- > > Key: FLINK-8286 > URL: https://issues.apache.org/jira/browse/FLINK-8286 > Project: Flink > Issue Type: Bug > Components: Security > Reporter: Shuyi Chen > Assignee: Shuyi Chen > Priority: Blocker > Labels: flip-6 > Fix For: 1.5.0 > > > The current Flink-Yarn-Kerberos in Flip-6 is broken. -- This message was sent by Atlassian JIRA (v7.6.3#76005)