[ 
https://issues.apache.org/jira/browse/FLINK-8286?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16458784#comment-16458784
 ] 

ASF GitHub Bot commented on FLINK-8286:
---------------------------------------

Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5896#discussion_r185052704
  
    --- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskExecutorRunnerFactory.java
 ---
    @@ -73,61 +105,68 @@ public static void main(String[] args) {
                SignalHandler.register(LOG);
                JvmShutdownSafeguard.installAsShutdownHook(LOG);
     
    -           run(args);
    +           try {
    +                   SecurityUtils.getInstalledContext().runSecured(
    +                                   
YarnTaskExecutorRunnerFactory.create(System.getenv()));
    +           } catch (Exception e) {
    +                   LOG.error("Exception occurred while launching Task 
Executor runner", e);
    +                   throw new RuntimeException(e);
    +           }
        }
     
        /**
    -    * The instance entry point for the YARN task executor. Obtains user 
group information and calls
    -    * the main work method {@link 
TaskManagerRunner#runTaskManager(Configuration, ResourceID)}  as a
    -    * privileged action.
    +    * Creates a {@link YarnTaskExecutorRunnerFactory.Runner}.
         *
    -    * @param args The command line arguments.
    +    * @param envs environment variables.
         */
    -   private static void run(String[] args) {
    -           try {
    -                   LOG.debug("All environment variables: {}", ENV);
    +   @VisibleForTesting
    +   protected static Runner create(Map<String, String> envs) {
    +           LOG.debug("All environment variables: {}", envs);
     
    -                   final String yarnClientUsername = 
ENV.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    -                   final String localDirs = 
ENV.get(Environment.LOCAL_DIRS.key());
    -                   LOG.info("Current working/local Directory: {}", 
localDirs);
    +           final String yarnClientUsername = 
envs.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
    +           final String localDirs = envs.get(Environment.LOCAL_DIRS.key());
    +           LOG.info("Current working/local Directory: {}", localDirs);
     
    -                   final String currDir = ENV.get(Environment.PWD.key());
    -                   LOG.info("Current working Directory: {}", currDir);
    +           final String currDir = envs.get(Environment.PWD.key());
    +           LOG.info("Current working Directory: {}", currDir);
     
    -                   final String remoteKeytabPath = 
ENV.get(YarnConfigKeys.KEYTAB_PATH);
    -                   LOG.info("TM: remote keytab path obtained {}", 
remoteKeytabPath);
    +           final String remoteKeytabPrincipal = 
envs.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    +           LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
     
    -                   final String remoteKeytabPrincipal = 
ENV.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
    -                   LOG.info("TM: remote keytab principal obtained {}", 
remoteKeytabPrincipal);
    -
    -                   final Configuration configuration = 
GlobalConfiguration.loadConfiguration(currDir);
    +           final Configuration configuration;
    +           try {
    +                   configuration = 
GlobalConfiguration.loadConfiguration(currDir);
                        FileSystem.initialize(configuration);
    +           } catch (Throwable t) {
    +                   LOG.error(t.getMessage(), t);
    +                   return null;
    +           }
     
    -                   // configure local directory
    -                   if (configuration.contains(CoreOptions.TMP_DIRS)) {
    -                           LOG.info("Overriding YARN's temporary file 
directories with those " +
    -                                   "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
    -                   }
    -                   else {
    -                           LOG.info("Setting directories for temporary 
files to: {}", localDirs);
    -                           configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
    -                   }
    -
    -                   // tell akka to die in case of an error
    -                   
configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, true);
    +           // configure local directory
    +           if (configuration.contains(CoreOptions.TMP_DIRS)) {
    +                   LOG.info("Overriding YARN's temporary file directories 
with those " +
    +                           "specified in the Flink config: " + 
configuration.getValue(CoreOptions.TMP_DIRS));
    +           }
    +           else {
    +                   LOG.info("Setting directories for temporary files to: 
{}", localDirs);
    +                   configuration.setString(CoreOptions.TMP_DIRS, 
localDirs);
    +           }
     
    -                   String keytabPath = null;
    -                   if (remoteKeytabPath != null) {
    -                           File f = new File(currDir, 
Utils.KEYTAB_FILE_NAME);
    -                           keytabPath = f.getAbsolutePath();
    -                           LOG.info("keytab path: {}", keytabPath);
    -                   }
    +           // tell akka to die in case of an error
    +           configuration.setBoolean(AkkaOptions.JVM_EXIT_ON_FATAL_ERROR, 
true);
     
    +           try {
                        UserGroupInformation currentUser = 
UserGroupInformation.getCurrentUser();
     
                        LOG.info("YARN daemon is running as: {} Yarn client 
user obtainer: {}",
                                        currentUser.getShortUserName(), 
yarnClientUsername);
     
    +                   File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
    --- End diff --
    
    Is the only change really that we always do this instead of having the 
check on `remoteKeytabPath`, as the old code had?
    
    The old code had this on line 120:
    ```
    if (remoteKeytabPath != null) {
        File f = new File(currDir, Utils.KEYTAB_FILE_NAME);
        keytabPath = f.getAbsolutePath();
        LOG.info("keytab path: {}", keytabPath);
    }
    ```


> Fix Flink-Yarn-Kerberos integration for FLIP-6
> ----------------------------------------------
>
>                 Key: FLINK-8286
>                 URL: https://issues.apache.org/jira/browse/FLINK-8286
>             Project: Flink
>          Issue Type: Bug
>          Components: Security
>            Reporter: Shuyi Chen
>            Assignee: Shuyi Chen
>            Priority: Blocker
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> The current Flink-Yarn-Kerberos in Flip-6 is broken. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to