Repository: oozie
Updated Branches:
  refs/heads/master 50f4b5984 -> c29d9c5fc


OOZIE-2900 Retrieve tokens for oozie.launcher.mapreduce.job.hdfs-servers before 
submission (asasvari)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c29d9c5f
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c29d9c5f
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c29d9c5f

Branch: refs/heads/master
Commit: c29d9c5fce27f5f5335662ce2649387258f1c7fd
Parents: 50f4b59
Author: Attila Sasvari <asasv...@cloudera.com>
Authored: Fri Nov 24 15:36:26 2017 +0100
Committer: Attila Sasvari <asasv...@cloudera.com>
Committed: Fri Nov 24 15:36:26 2017 +0100

----------------------------------------------------------------------
 .../action/hadoop/DistcpActionExecutor.java     | 31 +++++++++++
 .../oozie/action/hadoop/HDFSCredentials.java    | 55 ++++++++++++++++++-
 .../oozie/action/hadoop/JavaActionExecutor.java | 56 +++++++++++++-------
 release-log.txt                                 |  1 +
 4 files changed, 121 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
index 81e28f7..89c8440 100644
--- 
a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
+++ 
b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java
@@ -20,6 +20,7 @@ package org.apache.oozie.action.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.oozie.action.ActionExecutorException;
 import org.apache.oozie.service.ConfigurationService;
 import org.apache.oozie.util.XLog;
@@ -35,6 +36,19 @@ public class DistcpActionExecutor extends JavaActionExecutor{
     private static final XLog LOG = XLog.getLog(DistcpActionExecutor.class);
     public static final String DISTCP_TYPE = "distcp";
 
+    /**
+     * Comma separated list of NameNode hosts to obtain delegation token(s) 
for.
+     */
+    private static final String OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS = 
"oozie.launcher.mapreduce.job.hdfs-servers";
+
+    /**
+     * Comma separated list to instruct ResourceManagers on either cluster to 
skip delegation token renewal for NameNode hosts.
+     */
+    private static final String 
OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS_TOKEN_RENEWAL_EXCLUDE =
+            "oozie.launcher.mapreduce.job.hdfs-servers.token-renewal.exclude";
+    private static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = 
"mapreduce.job.hdfs-servers.token-renewal.exclude";
+
+
     public DistcpActionExecutor() {
         super("distcp");
     }
@@ -110,4 +124,21 @@ public class DistcpActionExecutor extends 
JavaActionExecutor{
     protected String getLauncherMain(Configuration launcherConf, Element 
actionXml) {
         return launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, 
CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS);
     }
+
+    /**
+     * Extracts information required for DistCp action between secure clusters 
(in the same or distinct Kerberos realms)
+     *
+     * @param jobconf workflow action configuration
+     */
+    @Override
+    protected void setActionTokenProperties(final Configuration jobconf) {
+        final String hdfsServers = 
jobconf.get(OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS);
+        if (hdfsServers != null) {
+            jobconf.set(MRJobConfig.JOB_NAMENODES, hdfsServers);
+            final String tokenRenewalExclude = 
jobconf.get(OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS_TOKEN_RENEWAL_EXCLUDE);
+            if (tokenRenewalExclude != null) {
+                jobconf.set(JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE, 
tokenRenewalExclude);
+            }
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
index 92a7ebe..6c85662 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java
@@ -20,16 +20,27 @@ package org.apache.oozie.action.hadoop;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.oozie.ErrorCode;
 import org.apache.oozie.action.ActionExecutor;
+import org.apache.oozie.service.HadoopAccessorException;
+import org.apache.oozie.service.Services;
+import org.apache.oozie.service.UserGroupInformationService;
 import org.apache.oozie.util.XLog;
 
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.security.PrivilegedExceptionAction;
+
 
 public class HDFSCredentials implements CredentialsProvider {
     protected XLog LOG = XLog.getLog(getClass());
-
     /**
      * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided.
      * This is also important to ensure that log aggregation works correctly 
from the NM
@@ -43,6 +54,29 @@ public class HDFSCredentials implements CredentialsProvider {
     @Override
     public void updateCredentials(Credentials credentials, Configuration 
config, CredentialsProperties props,
                                   ActionExecutor.Context context) throws 
Exception {
+        final String jobNameNodes[] = 
config.getStrings(MRJobConfig.JOB_NAMENODES);
+        if (jobNameNodes != null) {
+            final Path[] paths = new Path[jobNameNodes.length];
+            for (int i = 0; i != jobNameNodes.length; ++i) {
+                paths[i] = new Path(jobNameNodes[i]);
+            }
+
+            final UserGroupInformation ugi = 
Services.get().get(UserGroupInformationService.class)
+                    .getProxyUser(context.getWorkflow().getUser());
+            final User user = User.create(ugi);
+
+            obtainTokensForNamenodes(credentials, config, user, paths);
+        }
+        else {
+            obtainTokenForAppFileSystemNameNode(credentials, config, context);
+        }
+
+    }
+
+    private void obtainTokenForAppFileSystemNameNode(final Credentials 
credentials,
+                                                     final Configuration 
config,
+                                                     final 
ActionExecutor.Context context)
+            throws IOException, CredentialException, HadoopAccessorException, 
URISyntaxException {
         try (FileSystem fileSystem = context.getAppFileSystem()) {
             final String renewer = new 
HadoopTokenHelper().getServerPrincipal(config);
             LOG.debug("Server principal present, getting HDFS delegation 
token. [renewer={0}]", renewer);
@@ -50,7 +84,8 @@ public class HDFSCredentials implements CredentialsProvider {
             if (hdfsDelegationToken == null) {
                 throw new CredentialException(ErrorCode.E0511, renewer);
             }
-            LOG.info("Got HDFS delegation token, setting credentials. 
[hdfsDelegationToken={0}]", hdfsDelegationToken);
+            LOG.info("Got HDFS delegation token, setting credentials. 
[hdfsDelegationToken={0}]",
+                    hdfsDelegationToken);
             credentials.addToken(hdfsDelegationToken.getService(), 
hdfsDelegationToken);
         } catch (Exception e) {
             LOG.debug("exception in updateCredentials", e);
@@ -58,4 +93,20 @@ public class HDFSCredentials implements CredentialsProvider {
         }
     }
 
+    private void obtainTokensForNamenodes(final Credentials credentials,
+                                          final Configuration config,
+                                          final User user,
+                                          final Path[] paths) throws 
IOException, InterruptedException {
+        LOG.info(String.format("\"%s\" is present in workflow configuration. 
Obtaining tokens for NameNode(s) [%s]",
+                MRJobConfig.JOB_NAMENODES, 
config.get(MRJobConfig.JOB_NAMENODES)));
+        user.runAs(
+                new PrivilegedExceptionAction<Void>() {
+                    @Override
+                    public Void run() throws Exception {
+                        TokenCache.obtainTokensForNamenodes(credentials, 
paths, config);
+                        return null;
+                    }
+                }
+        );
+    }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java 
b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
index a1df304..cab0d8b 100644
--- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
+++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
@@ -55,7 +55,7 @@ import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.TaskLog;
-import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.AccessControlException;
@@ -65,7 +65,6 @@ import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.ApplicationConstants;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
-import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
@@ -1373,31 +1372,48 @@ public class JavaActionExecutor extends ActionExecutor {
 
     protected void setCredentialTokens(Credentials credentials, Configuration 
jobconf, Context context, WorkflowAction action,
                                        Map<String, CredentialsProperties> 
credPropertiesMap) throws Exception {
+        if (!isValidCredentialTokensPreconditions(context, action, 
credPropertiesMap)) {
+            LOG.debug("Not obtaining delegation token(s) as preconditions do 
not hold.");
+            return;
+        }
 
-        if (context != null && action != null && credPropertiesMap != null) {
-            // Make sure we're logged into Kerberos; if not, or near 
expiration, it will relogin
-            CredentialsProviderFactory.ensureKerberosLogin();
-            for (Entry<String, CredentialsProperties> entry : 
credPropertiesMap.entrySet()) {
-                String credName = entry.getKey();
-                CredentialsProperties credProps = entry.getValue();
-                if (credProps != null) {
-                    CredentialsProvider tokenProvider = 
CredentialsProviderFactory.getInstance()
-                            .createCredentialsProvider(credProps.getType());
-                    if (tokenProvider != null) {
-                        tokenProvider.updateCredentials(credentials, jobconf, 
credProps, context);
-                        LOG.debug("Retrieved Credential '" + credName + "' for 
action " + action.getId());
-                    }
-                    else {
-                        LOG.debug("Credentials object is null for name= " + 
credName + ", type=" + credProps.getType());
-                        throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
+        setActionTokenProperties(jobconf);
+        // Make sure we're logged into Kerberos; if not, or near expiration, 
it will relogin
+        CredentialsProviderFactory.ensureKerberosLogin();
+        for (Entry<String, CredentialsProperties> entry : 
credPropertiesMap.entrySet()) {
+            String credName = entry.getKey();
+            CredentialsProperties credProps = entry.getValue();
+            if (credProps != null) {
+                CredentialsProvider tokenProvider = 
CredentialsProviderFactory.getInstance()
+                        .createCredentialsProvider(credProps.getType());
+                if (tokenProvider != null) {
+                    tokenProvider.updateCredentials(credentials, jobconf, 
credProps, context);
+                    LOG.debug("Retrieved Credential '" + credName + "' for 
action " + action.getId());
+                } else {
+                    LOG.debug("Credentials object is null for name= " + 
credName + ", type=" + credProps.getType());
+                    throw new 
ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020",
                             "Could not load credentials of type [{0}] with 
name [{1}]]; perhaps it was not defined"
-                                + " in oozie-site.xml?", credProps.getType(), 
credName);
-                    }
+                                    + " in oozie-site.xml?", 
credProps.getType(), credName);
                 }
             }
         }
     }
 
+    private boolean isValidCredentialTokensPreconditions(final Context context,
+                                                         final WorkflowAction 
action,
+                                                         final Map<String, 
CredentialsProperties> credPropertiesMap) {
+        return context != null && action != null && credPropertiesMap != null;
+    }
+
+    /**
+     * Subclasses may override this method in order to take additional actions 
required for obtaining credential token(s).
+     *
+     * @param jobconf workflow action configuration
+     */
+    protected void setActionTokenProperties(final Configuration jobconf) {
+        // nop
+    }
+
     protected HashMap<String, CredentialsProperties> 
getActionCredentialsProperties(Context context,
             WorkflowAction action) throws Exception {
         HashMap<String, CredentialsProperties> props = new HashMap<String, 
CredentialsProperties>();

http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 4620875..f97b3d5 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.0.0 release (trunk - unreleased)
 
+OOZIE-2900 Retrieve tokens for oozie.launcher.mapreduce.job.hdfs-servers 
before submission (asasvari)
 OOZIE-3132 Instrument SLACalculatorMemory (andras.piros)
 OOZIE-2945 Update SpotBugs to stable version after GA (dbist13 via gezapeti)
 OOZIE-3114 Fix javadoc for warning: no @return (dbist13 via gezapeti)

Reply via email to