Repository: oozie Updated Branches: refs/heads/master 50f4b5984 -> c29d9c5fc
OOZIE-2900 Retrieve tokens for oozie.launcher.mapreduce.job.hdfs-servers before submission (asasvari) Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/c29d9c5f Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/c29d9c5f Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/c29d9c5f Branch: refs/heads/master Commit: c29d9c5fce27f5f5335662ce2649387258f1c7fd Parents: 50f4b59 Author: Attila Sasvari <asasv...@cloudera.com> Authored: Fri Nov 24 15:36:26 2017 +0100 Committer: Attila Sasvari <asasv...@cloudera.com> Committed: Fri Nov 24 15:36:26 2017 +0100 ---------------------------------------------------------------------- .../action/hadoop/DistcpActionExecutor.java | 31 +++++++++++ .../oozie/action/hadoop/HDFSCredentials.java | 55 ++++++++++++++++++- .../oozie/action/hadoop/JavaActionExecutor.java | 56 +++++++++++++------- release-log.txt | 1 + 4 files changed, 121 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java index 81e28f7..89c8440 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/DistcpActionExecutor.java @@ -20,6 +20,7 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.util.XLog; @@ -35,6 +36,19 @@ public class DistcpActionExecutor extends JavaActionExecutor{ private static final XLog LOG = XLog.getLog(DistcpActionExecutor.class); public static final String DISTCP_TYPE = "distcp"; + /** + * Comma separated list of NameNode hosts to obtain delegation token(s) for. + */ + private static final String OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS = "oozie.launcher.mapreduce.job.hdfs-servers"; + + /** + * Comma separated list to instruct ResourceManagers on either cluster to skip delegation token renewal for NameNode hosts. + */ + private static final String OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS_TOKEN_RENEWAL_EXCLUDE = + "oozie.launcher.mapreduce.job.hdfs-servers.token-renewal.exclude"; + private static final String JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE = "mapreduce.job.hdfs-servers.token-renewal.exclude"; + + public DistcpActionExecutor() { super("distcp"); } @@ -110,4 +124,21 @@ public class DistcpActionExecutor extends JavaActionExecutor{ protected String getLauncherMain(Configuration launcherConf, Element actionXml) { return launcherConf.get(LauncherAMUtils.CONF_OOZIE_ACTION_MAIN_CLASS, CONF_OOZIE_DISTCP_ACTION_MAIN_CLASS); } + + /** + * Extracts information required for DistCp action between secure clusters (in the same or distinct Kerberos realms) + * + * @param jobconf workflow action configuration + */ + @Override + protected void setActionTokenProperties(final Configuration jobconf) { + final String hdfsServers = jobconf.get(OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS); + if (hdfsServers != null) { + jobconf.set(MRJobConfig.JOB_NAMENODES, hdfsServers); + final String tokenRenewalExclude = jobconf.get(OOZIE_LAUNCHER_MAPREDUCE_JOB_HDFS_SERVERS_TOKEN_RENEWAL_EXCLUDE); + if (tokenRenewalExclude != null) { + jobconf.set(JOB_NAMENODES_TOKEN_RENEWAL_EXCLUDE, tokenRenewalExclude); + } + } + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java index 92a7ebe..6c85662 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java @@ -20,16 +20,27 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.oozie.ErrorCode; import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.UserGroupInformationService; import org.apache.oozie.util.XLog; +import java.io.IOException; +import java.net.URISyntaxException; +import java.security.PrivilegedExceptionAction; + public class HDFSCredentials implements CredentialsProvider { protected XLog LOG = XLog.getLog(getClass()); - /** * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided. * This is also important to ensure that log aggregation works correctly from the NM @@ -43,6 +54,29 @@ public class HDFSCredentials implements CredentialsProvider { @Override public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props, ActionExecutor.Context context) throws Exception { + final String jobNameNodes[] = config.getStrings(MRJobConfig.JOB_NAMENODES); + if (jobNameNodes != null) { + final Path[] paths = new Path[jobNameNodes.length]; + for (int i = 0; i != jobNameNodes.length; ++i) { + paths[i] = new Path(jobNameNodes[i]); + } + + final UserGroupInformation ugi = Services.get().get(UserGroupInformationService.class) + .getProxyUser(context.getWorkflow().getUser()); + final User user = User.create(ugi); + + obtainTokensForNamenodes(credentials, config, user, paths); + } + else { + obtainTokenForAppFileSystemNameNode(credentials, config, context); + } + + } + + private void obtainTokenForAppFileSystemNameNode(final Credentials credentials, + final Configuration config, + final ActionExecutor.Context context) + throws IOException, CredentialException, HadoopAccessorException, URISyntaxException { try (FileSystem fileSystem = context.getAppFileSystem()) { final String renewer = new HadoopTokenHelper().getServerPrincipal(config); LOG.debug("Server principal present, getting HDFS delegation token. [renewer={0}]", renewer); @@ -50,7 +84,8 @@ public class HDFSCredentials implements CredentialsProvider { if (hdfsDelegationToken == null) { throw new CredentialException(ErrorCode.E0511, renewer); } - LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", hdfsDelegationToken); + LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", + hdfsDelegationToken); credentials.addToken(hdfsDelegationToken.getService(), hdfsDelegationToken); } catch (Exception e) { LOG.debug("exception in updateCredentials", e); @@ -58,4 +93,20 @@ public class HDFSCredentials implements CredentialsProvider { } } + private void obtainTokensForNamenodes(final Credentials credentials, + final Configuration config, + final User user, + final Path[] paths) throws IOException, InterruptedException { + LOG.info(String.format("\"%s\" is present in workflow configuration. Obtaining tokens for NameNode(s) [%s]", + MRJobConfig.JOB_NAMENODES, config.get(MRJobConfig.JOB_NAMENODES))); + user.runAs( + new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception { + TokenCache.obtainTokensForNamenodes(credentials, paths, config); + return null; + } + } + ); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index a1df304..cab0d8b 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -55,7 +55,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.TaskLog; -import org.apache.hadoop.mapreduce.MRConfig; +import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.filecache.ClientDistributedCacheManager; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.AccessControlException; @@ -65,7 +65,6 @@ import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope; -import org.apache.hadoop.yarn.api.records.ApplicationAccessType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -1373,31 +1372,48 @@ public class JavaActionExecutor extends ActionExecutor { protected void setCredentialTokens(Credentials credentials, Configuration jobconf, Context context, WorkflowAction action, Map<String, CredentialsProperties> credPropertiesMap) throws Exception { + if (!isValidCredentialTokensPreconditions(context, action, credPropertiesMap)) { + LOG.debug("Not obtaining delegation token(s) as preconditions do not hold."); + return; + } - if (context != null && action != null && credPropertiesMap != null) { - // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin - CredentialsProviderFactory.ensureKerberosLogin(); - for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { - String credName = entry.getKey(); - CredentialsProperties credProps = entry.getValue(); - if (credProps != null) { - CredentialsProvider tokenProvider = CredentialsProviderFactory.getInstance() - .createCredentialsProvider(credProps.getType()); - if (tokenProvider != null) { - tokenProvider.updateCredentials(credentials, jobconf, credProps, context); - LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); - } - else { - LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); - throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020", + setActionTokenProperties(jobconf); + // Make sure we're logged into Kerberos; if not, or near expiration, it will relogin + CredentialsProviderFactory.ensureKerberosLogin(); + for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + String credName = entry.getKey(); + CredentialsProperties credProps = entry.getValue(); + if (credProps != null) { + CredentialsProvider tokenProvider = CredentialsProviderFactory.getInstance() + .createCredentialsProvider(credProps.getType()); + if (tokenProvider != null) { + tokenProvider.updateCredentials(credentials, jobconf, credProps, context); + LOG.debug("Retrieved Credential '" + credName + "' for action " + action.getId()); + } else { + LOG.debug("Credentials object is null for name= " + credName + ", type=" + credProps.getType()); + throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA020", "Could not load credentials of type [{0}] with name [{1}]]; perhaps it was not defined" - + " in oozie-site.xml?", credProps.getType(), credName); - } + + " in oozie-site.xml?", credProps.getType(), credName); } } } } + private boolean isValidCredentialTokensPreconditions(final Context context, + final WorkflowAction action, + final Map<String, CredentialsProperties> credPropertiesMap) { + return context != null && action != null && credPropertiesMap != null; + } + + /** + * Subclasses may override this method in order to take additional actions required for obtaining credential token(s). + * + * @param jobconf workflow action configuration + */ + protected void setActionTokenProperties(final Configuration jobconf) { + // nop + } + protected HashMap<String, CredentialsProperties> getActionCredentialsProperties(Context context, WorkflowAction action) throws Exception { HashMap<String, CredentialsProperties> props = new HashMap<String, CredentialsProperties>(); http://git-wip-us.apache.org/repos/asf/oozie/blob/c29d9c5f/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 4620875..f97b3d5 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2900 Retrieve tokens for oozie.launcher.mapreduce.job.hdfs-servers before submission (asasvari) OOZIE-3132 Instrument SLACalculatorMemory (andras.piros) OOZIE-2945 Update SpotBugs to stable version after GA (dbist13 via gezapeti) OOZIE-3114 Fix javadoc for warning: no @return (dbist13 via gezapeti)