OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti)
Project: http://git-wip-us.apache.org/repos/asf/oozie/repo Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/9cb4bd05 Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/9cb4bd05 Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/9cb4bd05 Branch: refs/heads/master Commit: 9cb4bd05aa4a86d2193efe5f7bb9ef53f8fc33d2 Parents: d135b88 Author: Gezapeti Cseh <gezap...@apache.org> Authored: Wed Sep 20 12:17:27 2017 +0200 Committer: Gezapeti Cseh <gezap...@apache.org> Committed: Wed Sep 20 12:17:27 2017 +0200 ---------------------------------------------------------------------- .../main/java/org/apache/oozie/ErrorCode.java | 3 + .../action/hadoop/CredentialsProperties.java | 5 + .../hadoop/CredentialsProviderFactory.java | 17 +- .../action/hadoop/HCatCredentialHelper.java | 5 +- .../oozie/action/hadoop/HCatCredentials.java | 2 +- .../oozie/action/hadoop/HDFSCredentials.java | 61 +++ .../oozie/action/hadoop/HadoopTokenHelper.java | 85 +++++ .../oozie/action/hadoop/HbaseCredentials.java | 4 +- .../oozie/action/hadoop/Hive2Credentials.java | 14 +- .../oozie/action/hadoop/JHSCredentials.java | 119 ++++++ .../oozie/action/hadoop/JavaActionExecutor.java | 203 +++------- .../oozie/action/hadoop/YarnRMCredentials.java | 67 ++++ .../oozie/service/HadoopAccessorService.java | 91 +---- .../action/hadoop/TestHadoopTokenHelper.java | 44 +++ .../action/hadoop/TestJavaActionExecutor.java | 1 - .../oozie/action/hadoop/TestShellMain.java | 2 +- .../wf/TestWorkflowActionKillXCommand.java | 2 +- .../service/TestHadoopAccessorService.java | 31 -- release-log.txt | 1 + .../action/hadoop/AMRMClientAsyncFactory.java | 4 +- .../oozie/action/hadoop/HdfsOperations.java | 84 ++--- .../apache/oozie/action/hadoop/LauncherAM.java | 367 ++++++++++--------- .../apache/oozie/action/hadoop/ShellMain.java | 1 - .../oozie/action/hadoop/TestHdfsOperations.java | 11 - .../oozie/action/hadoop/TestLauncherAM.java | 36 +- 25 files changed, 728 insertions(+), 532 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/ErrorCode.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/ErrorCode.java b/core/src/main/java/org/apache/oozie/ErrorCode.java index 662e1ed..168c4fa 100644 --- a/core/src/main/java/org/apache/oozie/ErrorCode.java +++ b/core/src/main/java/org/apache/oozie/ErrorCode.java @@ -89,6 +89,9 @@ public enum ErrorCode { E0508(XLog.OPS, "User [{0}] not authorized for WF job [{1}]"), E0509(XLog.OPS, "User [{0}] not authorized for Coord job [{1}]"), E0510(XLog.OPS, "Unable to get Credential [{0}]"), + E0511(XLog.STD, "No HDFS delegation token present, can''t set credentials. [serverPrincipal={0}]"), + E0512(XLog.STD, "Could not get RM delegation token: {0}"), + E0513(XLog.STD, "No YARN renewer present, can''t get token. [servicePrincipal={0}]"), E0550(XLog.OPS, "Could not normalize host name [{0}], {1}"), E0551(XLog.OPS, "Missing [{0}] property"), http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java index 20f93ce..3dea787 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProperties.java @@ -84,4 +84,9 @@ public class CredentialsProperties { public void setProperties(HashMap<String, String> properties) { this.properties = properties; } + + @Override + public String toString() { + return String.format("name=%s, type=%s", name, type); + } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java index 5ca8d3e..a353e15 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/CredentialsProviderFactory.java @@ -23,15 +23,20 @@ import java.util.HashMap; import java.util.Map; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.io.Text; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.oozie.service.ConfigurationService; import org.apache.oozie.util.XLog; public class CredentialsProviderFactory { public static final String CRED_KEY = "oozie.credentials.credentialclasses"; private static final XLog LOG = XLog.getLog(CredentialsProviderFactory.class); + public static final String HDFS = "hdfs"; + public static final String YARN = "yarnRM"; + public static final String JHS = "jhs"; private static CredentialsProviderFactory instance; - private final Map<String, Class<CredentialsProvider>> providerCache; + private final Map<String, Class<? extends CredentialsProvider>> providerCache; @VisibleForTesting static void destroy() { @@ -70,6 +75,14 @@ public class CredentialsProviderFactory { } } } + providerCache.put(HDFS, HDFSCredentials.class); + providerCache.put(YARN, YarnRMCredentials.class); + providerCache.put(JHS, JHSCredentials.class); + } + + static Text getUniqueAlias(Token<?> token) { + return new Text(String.format("%s_%s_%d", token.getKind().toString(), + token.getService().toString(), System.currentTimeMillis())); } /** @@ -80,7 +93,7 @@ public class CredentialsProviderFactory { * @throws Exception */ public CredentialsProvider createCredentialsProvider(String type) throws Exception { - Class<CredentialsProvider> providerClass = providerCache.get(type); + Class<? extends CredentialsProvider> providerClass = providerCache.get(type); if(providerClass == null){ return null; } http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java index 9804c7b..274db78 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentialHelper.java @@ -20,7 +20,6 @@ package org.apache.oozie.action.hadoop; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SaslRpcServer; @@ -63,8 +62,8 @@ public class HCatCredentialHelper { .getLoginUser().getShortUserName()); Token<DelegationTokenIdentifier> hcatToken = new Token<DelegationTokenIdentifier>(); hcatToken.decodeFromUrlString(tokenStrForm); - credentials.addToken(new Text("HCat Token"), hcatToken); - XLog.getLog(getClass()).debug("Added the HCat token to launcher configuration"); + credentials.addToken(CredentialsProviderFactory.getUniqueAlias(hcatToken), hcatToken); + XLog.getLog(getClass()).debug("Added the HCat token to launcher's credentials"); } catch (Exception ex) { XLog.getLog(getClass()).debug("set Exception {0}", ex.getMessage()); http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java index 52abbf1..47b2407 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HCatCredentials.java @@ -70,7 +70,7 @@ public class HCatCredentials implements CredentialsProvider { hcch.set(credentials, config, principal, server); } catch (Exception e) { - XLog.getLog(getClass()).warn("Exception in addtoJobConf", e); + XLog.getLog(getClass()).warn("Exception in updateCredentials", e); throw e; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java new file mode 100644 index 0000000..c693399 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HDFSCredentials.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.util.XLog; + + +public class HDFSCredentials implements CredentialsProvider { + protected XLog LOG = XLog.getLog(getClass()); + + /** + * Add an HDFS_DELEGATION_TOKEN to the {@link Credentials} provided. + * This is also important to ensure that log aggregation works correctly from the NM + * + * @param credentials the credentials object which is updated + * @param config launcher AM configuration + * @param props properties for getting credential token or certificate + * @param context workflow context + * @throws Exception thrown if failed + */ + @Override + public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props, + ActionExecutor.Context context) throws Exception { + try (FileSystem fileSystem = context.getAppFileSystem()) { + final String renewer = new HadoopTokenHelper().getServerPrincipal(config); + LOG.debug("Server principal present, getting HDFS delegation token. [renewer={0}]", renewer); + final Token hdfsDelegationToken = fileSystem.getDelegationToken(renewer); + if (hdfsDelegationToken == null) { + throw new CredentialException(ErrorCode.E0511, renewer); + } + LOG.info("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", hdfsDelegationToken); + credentials.addToken(hdfsDelegationToken.getService(), hdfsDelegationToken); + } catch (Exception e) { + LOG.debug("exception in updateCredentials", e); + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java b/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java new file mode 100644 index 0000000..1018d9d --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HadoopTokenHelper.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.service.HadoopAccessorException; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.util.XLog; + +import java.io.IOException; +import java.net.URISyntaxException; + +public class HadoopTokenHelper { + /** The Kerberos principal for the resource manager.*/ + protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; + protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; + private XLog LOG = XLog.getLog(getClass()); + + private String getServicePrincipal(final Configuration configuration) { + return configuration.get(RM_PRINCIPAL); + } + + String getServerPrincipal(final Configuration configuration) throws IOException { + return getServerPrincipal(configuration, getServicePrincipal(configuration)); + } + + /** + * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer. + * + * @param configuration the {@link Configuration} containing the YARN RM address + * @param servicePrincipal the configured service principal + * @return the server principal originating from the host name and the service principal + * @throws IOException when something goes wrong finding out the local address inside + * {@link SecurityUtil#getServerPrincipal(String, String)} + */ + private String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException { + Preconditions.checkNotNull(configuration, "configuration has to be filled"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled"); + Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)), + String.format("configuration entry %s has to be filled", HADOOP_YARN_RM)); + + String serverPrincipal; + final String target = configuration.get(HADOOP_YARN_RM); + + try { + final String addr = NetUtils.createSocketAddr(target).getHostName(); + serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr); + LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target); + } catch (final IllegalArgumentException iae) { + LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae); + + serverPrincipal = servicePrincipal.split("[/@]")[0]; + LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal); + } + + return serverPrincipal; + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java index 4add5f1..bc87f29 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/HbaseCredentials.java @@ -79,6 +79,7 @@ public class HbaseCredentials implements CredentialsProvider { User u = User.create(ugi); // A direct doAs is required here vs. User#obtainAuthTokenForJob(...) // See OOZIE-2419 for more + XLog.getLog(getClass()).debug("Getting Hbase token for user {0}", user); Token<AuthenticationTokenIdentifier> token = u.runAs( new PrivilegedExceptionAction<Token<AuthenticationTokenIdentifier>>() { public Token<AuthenticationTokenIdentifier> run() throws Exception { @@ -90,7 +91,8 @@ public class HbaseCredentials implements CredentialsProvider { } } ); - credentials.addToken(token.getService(), token); + XLog.getLog(getClass()).debug("Got token, adding it to credentials."); + credentials.addToken(CredentialsProviderFactory.getUniqueAlias(token), token); } private void addPropsConf(CredentialsProperties props, Configuration destConf) { http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java index 0b495f7..d34f560 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/Hive2Credentials.java @@ -23,7 +23,6 @@ import java.sql.DriverManager; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; -import org.apache.hadoop.io.Text; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hive.jdbc.HiveConnection; @@ -32,9 +31,9 @@ import org.apache.oozie.action.ActionExecutor.Context; import org.apache.oozie.util.XLog; /** - * Credentials implementation to store in jobConf, Hive Server 2 specific properties + * Credentials implementation, Hive Server 2 specific properties * User specifies these credential properties along with the action configuration - * The jobConf is used further to pass credentials to the tasks while running + * The credentials is used further to pass credentials to the tasks while running * Oozie server should be configured to use this class by including it via property 'oozie.credentials.credentialclasses' * User can extend the parent class to implement own class as well * for handling custom token-based credentials and add to the above server property @@ -60,7 +59,7 @@ public class Hive2Credentials implements CredentialsProvider { String principal = props.getProperties().get(HIVE2_SERVER_PRINCIPAL); if (principal == null || principal.isEmpty()) { throw new CredentialException(ErrorCode.E0510, - HIVE2_SERVER_PRINCIPAL + " is required to get hive server 2 credential"); + HIVE2_SERVER_PRINCIPAL + " is required to get hive server 2 credentials"); } url = url + ";principal=" + principal; Connection con = null; @@ -79,12 +78,13 @@ public class Hive2Credentials implements CredentialsProvider { Token<DelegationTokenIdentifier> hive2Token = new Token<DelegationTokenIdentifier>(); hive2Token.decodeFromUrlString(tokenStr); - credentials.addToken(new Text("hive.server2.delegation.token"), hive2Token); - XLog.getLog(getClass()).debug("Added the Hive Server 2 token in job conf"); + credentials.addToken(CredentialsProviderFactory.getUniqueAlias(hive2Token), hive2Token); + XLog.getLog(getClass()).debug("Added the Hive Server 2 token to launcher's credential"); } catch (Exception e) { - XLog.getLog(getClass()).warn("Exception in addtoJobConf", e); + XLog.getLog(getClass()).warn("Exception in obtaining Hive2 token", e); throw e; } } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java new file mode 100644 index 0000000..d9099c5 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JHSCredentials.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.service.Services; +import org.apache.oozie.service.UserGroupInformationService; +import org.apache.oozie.util.XLog; + +import java.io.IOException; +import java.security.PrivilegedAction; + +public class JHSCredentials implements CredentialsProvider { + protected XLog LOG = XLog.getLog(getClass()); + + + /** + * Add an MR_DELEGATION_TOKEN to the {@link Credentials} provided. + * @param credentials the credentials object which is updated + * @param config launcher AM configuration + * @param props properties for getting credential token or certificate + * @param context workflow context + * @throws Exception thrown if failed + */ + @Override + public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props, + ActionExecutor.Context context) throws Exception { + try { + LOG.debug("Instantiating JHS Proxy"); + MRClientProtocol hsProxy = instantiateHistoryProxy(config, context); + Text hsService = SecurityUtil.buildTokenService(hsProxy.getConnectAddress()); + LOG.debug("Getting delegation token for {0}", hsService.toString()); + Token<?> jhsToken = getDelegationTokenFromJHS(hsProxy, new HadoopTokenHelper().getServerPrincipal(config)); + LOG.debug("Acquired token {0}", jhsToken); + credentials.addToken(hsService, jhsToken); + } catch (IOException | InterruptedException ex) { + LOG.debug("exception in updateCredentials", ex); + throw new CredentialException(ErrorCode.E0512, ex.getMessage(), ex); + } + } + + /** + * Get a Delegation token from the JHS. + * Copied over from YARNRunner in Hadoop. + * @param hsProxy protcol used to get the token + * @return The RM_DELEGATION_TOKEN that can be used to talk to JHS + * @throws IOException + * @throws InterruptedException + */ + private Token<?> getDelegationTokenFromJHS(final MRClientProtocol hsProxy, final String renewer) + throws IOException, InterruptedException { + GetDelegationTokenRequest request = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(GetDelegationTokenRequest.class); + LOG.debug("Creating requsest to JHS using renewer [{0}]", renewer); + request.setRenewer(renewer); + org.apache.hadoop.yarn.api.records.Token mrDelegationToken = hsProxy.getDelegationToken(request) + .getDelegationToken(); + LOG.debug("Got token to JHS : {0}. Converting token.", mrDelegationToken); + return ConverterUtils.convertFromYarn(mrDelegationToken, hsProxy.getConnectAddress()); + } + + /** + * Create an MRClientProtocol to the JHS + * Copied over from ClientCache in Hadoop. + * @return the protocol that can be used to get a token with + * @throws IOException + */ + private MRClientProtocol instantiateHistoryProxy(final Configuration configuration, final ActionExecutor.Context context) + throws IOException { + final String serviceAddr = configuration.get(JHAdminConfig.MR_HISTORY_ADDRESS); + if (StringUtils.isEmpty(serviceAddr)) { + return null; + } + LOG.debug("Connecting to JHS at: " + serviceAddr); + final YarnRPC rpc = YarnRPC.create(configuration); + LOG.debug("Connected to JHS at: " + serviceAddr); + UserGroupInformation currentUser = Services.get().get(UserGroupInformationService.class) + .getProxyUser(context.getWorkflow().getUser()); + return currentUser.doAs(new PrivilegedAction<MRClientProtocol>() { + @Override + public MRClientProtocol run() { + return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, + NetUtils.createSocketAddr(serviceAddr), configuration); + } + }); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java index 9d1afb5..be05603 100644 --- a/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java +++ b/core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java @@ -51,7 +51,6 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.TaskLog; @@ -60,8 +59,6 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.DiskChecker; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -530,9 +527,9 @@ public class JavaActionExecutor extends ActionExecutor { return conf; } catch (Exception ex) { - LOG.debug( - "Errors when add to DistributedCache. Path=" + Objects.toString(uri, "<null>") + ", archive=" - + archive + ", conf=" + XmlUtils.prettyPrint(conf).toString()); + LOG.debug("Errors when add to DistributedCache. Path=" + + Objects.toString(uri, "<null>") + ", archive=" + archive + ", conf=" + + XmlUtils.prettyPrint(conf).toString()); throw convertException(ex); } } @@ -1021,42 +1018,33 @@ public class JavaActionExecutor extends ActionExecutor { } } - // Setting the credential properties in launcher conf - Configuration credentialsConf = null; - + Credentials credentials = new Credentials(); + Configuration launcherConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); + yarnClient = createYarnClient(context, launcherConf); Map<String, CredentialsProperties> credentialsProperties = setCredentialPropertyToActionConf(context, action, actionConf); - Credentials credentials = null; - if (credentialsProperties != null) { - credentials = new Credentials(); - // Adding if action need to set more credential tokens - credentialsConf = new Configuration(false); - XConfiguration.copy(actionConf, credentialsConf); - setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties); - - // insert conf to action conf from credentialsConf - for (Entry<String, String> entry : credentialsConf) { - if (actionConf.get(entry.getKey()) == null) { - actionConf.set(entry.getKey(), entry.getValue()); - } - } + if (UserGroupInformation.isSecurityEnabled()) { + addHadoopCredentialPropertiesToActionConf(credentialsProperties); } - Configuration launcherJobConf = createLauncherConf(actionFs, context, action, actionXml, actionConf); + // Adding if action need to set more credential tokens + Configuration credentialsConf = new Configuration(false); + XConfiguration.copy(actionConf, credentialsConf); + setCredentialTokens(credentials, credentialsConf, context, action, credentialsProperties); + // copy back new entries from credentialsConf + for (Entry<String, String> entry : credentialsConf) { + if (actionConf.get(entry.getKey()) == null) { + actionConf.set(entry.getKey(), entry.getValue()); + } + } String consoleUrl; - String launcherId = LauncherHelper.getRecoveryId(launcherJobConf, context.getActionDir(), context + String launcherId = LauncherHelper.getRecoveryId(launcherConf, context.getActionDir(), context .getRecoveryId()); boolean alreadyRunning = launcherId != null; // if user-retry is on, always submit new launcher boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry(); LOG.debug("Creating yarnClient for action {0}", action.getId()); - yarnClient = createYarnClient(context, launcherJobConf); - - if (UserGroupInformation.isSecurityEnabled()) { - credentials = ensureCredentials(credentials); - acquireHDFSDelegationToken(actionFs, credentialsConf, credentials); - } if (alreadyRunning && !isUserRetry) { try { @@ -1066,51 +1054,16 @@ public class JavaActionExecutor extends ActionExecutor { } catch (RemoteException e) { // caught when the application id does not exist LOG.error("Got RemoteException from YARN", e); - String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); + String jobTracker = launcherConf.get(HADOOP_YARN_RM); throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA017", "unknown job [{0}@{1}], cannot recover", launcherId, jobTracker); } } else { - // TODO: OYA: do we actually need an MR token? IIRC, it's issued by the JHS -// // setting up propagation of the delegation token. -// Token<DelegationTokenIdentifier> mrdt = null; -// HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); -// mrdt = jobClient.getDelegationToken(has -// .getMRDelegationTokenRenewer(launcherJobConf)); -// launcherJobConf.getCredentials().addToken(HadoopAccessorService.MR_TOKEN_ALIAS, mrdt); - - // insert credentials tokens to launcher job conf if needed - if (credentialsConf != null) { - for (Token<? extends TokenIdentifier> tk :credentials.getAllTokens()) { - Text fauxAlias = new Text(tk.getKind() + "_" + tk.getService()); - LOG.debug("ADDING TOKEN: " + fauxAlias); - credentials.addToken(fauxAlias, tk); - } - if (credentials.numberOfSecretKeys() > 0) { - for (Entry<String, CredentialsProperties> entry : credentialsProperties.entrySet()) { - CredentialsProperties credProps = entry.getValue(); - if (credProps != null) { - Text credName = new Text(credProps.getName()); - byte[] secKey = credentials.getSecretKey(credName); - if (secKey != null) { - LOG.debug("ADDING CREDENTIAL: " + credProps.getName()); - credentials.addSecretKey(credName, secKey); - } - } - } - } - } - else { - LOG.info("No need to inject credentials."); - } - - String user = context.getWorkflow().getUser(); - YarnClientApplication newApp = yarnClient.createApplication(); ApplicationId appId = newApp.getNewApplicationResponse().getApplicationId(); ApplicationSubmissionContext appContext = - createAppSubmissionContext(appId, launcherJobConf, user, context, actionConf, action.getName(), + createAppSubmissionContext(appId, launcherConf, context, actionConf, action.getName(), credentials, actionXml); yarnClient.submitApplication(appContext); @@ -1120,7 +1073,7 @@ public class JavaActionExecutor extends ActionExecutor { consoleUrl = appReport.getTrackingUrl(); } - String jobTracker = launcherJobConf.get(HADOOP_YARN_RM); + String jobTracker = launcherConf.get(HADOOP_YARN_RM); context.setStartData(launcherId, jobTracker, consoleUrl); } catch (Exception ex) { @@ -1146,58 +1099,19 @@ public class JavaActionExecutor extends ActionExecutor { return context.getVar(OOZIE_ACTION_NAME); } - private Credentials ensureCredentials(final Credentials credentials) { - if (credentials == null) { - LOG.debug("No credentials present, creating a new one."); - return new Credentials(); - } - - return credentials; + private void addHadoopCredentialPropertiesToActionConf(Map<String, CredentialsProperties> credentialsProperties) { + LOG.info("Adding default credentials for action: hdfs, yarn and jhs"); + addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.HDFS); + addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.YARN); + addHadoopCredentialProperties(credentialsProperties, CredentialsProviderFactory.JHS); } - /** - * In a secure environment, when both HDFS HA and log aggregation are turned on, {@link JavaActionExecutor} is not able to call - * {@link YarnClient#submitApplication} since {@code HDFS_DELEGATION_TOKEN} is missing. - * - * @param actionFs the {@link FileSystem} to get the delegation token from - * @param credentialsConf the {@link Configuration} to extract the YARN renewer - * @param credentials the {@link Credentials} where the delegation token is stored - * @throws IOException - * @throws ActionExecutorException when security is enabled, but either {@code credentials} are empty, or - * {@code serverPrincipal} is empty, or HDFS delegation token is not present within {@code actionFs} - */ - private void acquireHDFSDelegationToken(final FileSystem actionFs, - final Configuration credentialsConf, - final Credentials credentials) - throws IOException, ActionExecutorException { - LOG.debug("Security is enabled, checking credentials to acquire HDFS delegation token."); - - final HadoopAccessorService hadoopAccessorService = Services.get().get(HadoopAccessorService.class); - final String servicePrincipal = hadoopAccessorService.getServicePrincipal(credentialsConf); - final String serverPrincipal = hadoopAccessorService.getServerPrincipal( - credentialsConf, - servicePrincipal); - if (serverPrincipal == null) { - final String errorTemplate = "No server principal present, won't get HDFS delegation token. [servicePrincipal={0}]"; - LOG.error(errorTemplate, servicePrincipal); - throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, servicePrincipal); - } - - LOG.debug("Server principal present, getting HDFS delegation token. [serverPrincipal={0}]", serverPrincipal); - final Token hdfsDelegationToken = actionFs.getDelegationToken(serverPrincipal); - if (hdfsDelegationToken == null) { - final String errorTemplate = "No HDFS delegation token present, won't set credentials. [serverPrincipal={0}]"; - LOG.error(errorTemplate, serverPrincipal); - throw new ActionExecutorException(ActionExecutorException.ErrorType.ERROR, "JA022", errorTemplate, serverPrincipal); - } - - LOG.debug("Got HDFS delegation token, setting credentials. [hdfsDelegationToken={0}]", - hdfsDelegationToken); - credentials.addToken(new Text(hdfsDelegationToken.getService().toString()), hdfsDelegationToken); + private void addHadoopCredentialProperties(Map<String, CredentialsProperties> credentialsProperties, String type) { + credentialsProperties.put(type, new CredentialsProperties(type, type)); } private ApplicationSubmissionContext createAppSubmissionContext(ApplicationId appId, Configuration launcherJobConf, - String user, Context context, Configuration actionConf, String actionName, + Context context, Configuration actionConf, String actionName, Credentials credentials, Element actionXml) throws IOException, HadoopAccessorException, URISyntaxException { @@ -1212,12 +1126,14 @@ public class JavaActionExecutor extends ActionExecutor { ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class); + final String user = context.getWorkflow().getUser(); // Set the resources to localize Map<String, LocalResource> localResources = new HashMap<String, LocalResource>(); ClientDistributedCacheManager.determineTimestampsAndCacheVisibilities(launcherJobConf); MRApps.setupDistributedCache(launcherJobConf, localResources); // Add the Launcher and Action configs as Resources HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); + launcherJobConf.set(LauncherAM.OOZIE_SUBMITTER_USER, user); LocalResource launcherJobConfLR = has.createLocalResourceForConfigurationFile(LauncherAM.LAUNCHER_JOB_CONF_XML, user, launcherJobConf, context.getAppFileSystem().getUri(), context.getActionDir()); localResources.put(LauncherAM.LAUNCHER_JOB_CONF_XML, launcherJobConfLR); @@ -1398,37 +1314,38 @@ public class JavaActionExecutor extends ActionExecutor { return envMap; } - protected HashMap<String, CredentialsProperties> setCredentialPropertyToActionConf(Context context, - WorkflowAction action, Configuration actionConf) throws Exception { - HashMap<String, CredentialsProperties> credPropertiesMap = null; - if (context != null && action != null) { - if (!"true".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP))) { - XConfiguration wfJobConf = getWorkflowConf(context); - if ("false".equals(actionConf.get(OOZIE_CREDENTIALS_SKIP)) || - !wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))) { - credPropertiesMap = getActionCredentialsProperties(context, action); - if (!credPropertiesMap.isEmpty()) { - for (Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { - if (entry.getValue() != null) { - CredentialsProperties prop = entry.getValue(); - LOG.debug("Credential Properties set for action : " + action.getId()); - for (Entry<String, String> propEntry : prop.getProperties().entrySet()) { - String key = propEntry.getKey(); - String value = propEntry.getValue(); - actionConf.set(key, value); - LOG.debug("property : '" + key + "', value : '" + value + "'"); - } - } - } - } else { - LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); + Map<String, CredentialsProperties> setCredentialPropertyToActionConf(final Context context, + final WorkflowAction action, + final Configuration actionConf) throws Exception { + final Map<String, CredentialsProperties> credPropertiesMap = new HashMap<>(); + if (context == null || action == null) { + LOG.warn("context or action is null"); + return credPropertiesMap; + } + final XConfiguration wfJobConf = getWorkflowConf(context); + final boolean skipCredentials = actionConf.getBoolean(OOZIE_CREDENTIALS_SKIP, + wfJobConf.getBoolean(OOZIE_CREDENTIALS_SKIP, ConfigurationService.getBoolean(OOZIE_CREDENTIALS_SKIP))); + if (skipCredentials) { + LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); + } else { + credPropertiesMap.putAll(getActionCredentialsProperties(context, action)); + if (credPropertiesMap.isEmpty()) { + LOG.warn("No credential properties found for action : " + action.getId() + ", cred : " + action.getCred()); + return credPropertiesMap; + } + for (final Entry<String, CredentialsProperties> entry : credPropertiesMap.entrySet()) { + if (entry.getValue() != null) { + final CredentialsProperties prop = entry.getValue(); + LOG.debug("Credential Properties set for action : " + action.getId()); + for (final Entry<String, String> propEntry : prop.getProperties().entrySet()) { + final String key = propEntry.getKey(); + final String value = propEntry.getValue(); + actionConf.set(key, value); + LOG.debug("property : '" + key + "', value : '" + value + "'"); } - } else { - LOG.info("Skipping credentials (" + OOZIE_CREDENTIALS_SKIP + "=true)"); } } } - return credPropertiesMap; } http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java b/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java new file mode 100644 index 0000000..061ca05 --- /dev/null +++ b/core/src/main/java/org/apache/oozie/action/hadoop/YarnRMCredentials.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.yarn.client.ClientRMProxy; +import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.util.ConverterUtils; +import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutor; +import org.apache.oozie.service.HadoopAccessorService; +import org.apache.oozie.service.Services; +import org.apache.oozie.util.XLog; + +public class YarnRMCredentials implements CredentialsProvider { + /** + * Add an RM_DELEGATION_TOKEN to the {@link Credentials} provided. + * + * @param credentials the credentials object which is updated + * @param config launcher AM configuration + * @param props properties for getting credential token or certificate + * @param context workflow context + * @throws Exception thrown if failed + */ + @Override + public void updateCredentials(Credentials credentials, Configuration config, CredentialsProperties props, + ActionExecutor.Context context) throws Exception { + Text rmDelegationTokenService = ClientRMProxy.getRMDelegationTokenService(config); + if (rmDelegationTokenService == null) { + throw new CredentialException(ErrorCode.E0512, "Can't create RMDelegationTokenService"); + } + try (YarnClient yarnClient = Services.get().get(HadoopAccessorService.class) + .createYarnClient(context.getWorkflow().getUser(), config)) { + org.apache.hadoop.yarn.api.records.Token rmDelegationToken = + yarnClient.getRMDelegationToken(new Text(new HadoopTokenHelper().getServerPrincipal(config))); + if (rmDelegationToken == null) { + throw new CredentialException(ErrorCode.E0512, "Returned token is null"); + } + Token<TokenIdentifier> rmToken = ConverterUtils.convertFromYarn(rmDelegationToken, rmDelegationTokenService); + credentials.addToken(rmDelegationTokenService, rmToken); + } catch (Exception e) { + XLog.getLog(getClass()).debug("Exception in updateCredentials", e); + throw e; + } + } + +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java index 187cee2..73300a6 100644 --- a/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java +++ b/core/src/main/java/org/apache/oozie/service/HadoopAccessorService.java @@ -20,23 +20,38 @@ package org.apache.oozie.service; import com.google.common.base.Preconditions; import com.google.common.base.Strings; + +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.Master; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; +import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.hadoop.yarn.api.records.LocalResourceType; import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.client.ClientRMProxy; import org.apache.hadoop.yarn.client.api.YarnClient; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.ipc.YarnRPC; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.Records; import org.apache.oozie.ErrorCode; +import org.apache.oozie.action.ActionExecutorException; import org.apache.oozie.action.hadoop.JavaActionExecutor; import org.apache.oozie.util.IOUtils; import org.apache.oozie.util.ParamChecker; @@ -55,6 +70,7 @@ import java.lang.reflect.Method; import java.net.InetAddress; import java.net.URI; import java.net.URISyntaxException; +import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import java.util.Arrays; import java.util.Comparator; @@ -84,16 +100,8 @@ public class HadoopAccessorService implements Service { public static final String KERBEROS_AUTH_ENABLED = CONF_PREFIX + "kerberos.enabled"; public static final String KERBEROS_KEYTAB = CONF_PREFIX + "keytab.file"; public static final String KERBEROS_PRINCIPAL = CONF_PREFIX + "kerberos.principal"; - public static final Text MR_TOKEN_ALIAS = new Text("oozie mr token"); - - /** The Kerberos principal for the job tracker.*/ - protected static final String JT_PRINCIPAL = "mapreduce.jobtracker.kerberos.principal"; - /** The Kerberos principal for the resource manager.*/ - protected static final String RM_PRINCIPAL = "yarn.resourcemanager.principal"; - protected static final String HADOOP_YARN_RM = "yarn.resourcemanager.address"; private static final String OOZIE_HADOOP_ACCESSOR_SERVICE_CREATED = "oozie.HadoopAccessorService.created"; - private static final Map<String, Text> mrTokenRenewers = new HashMap<String, Text>(); private static final String DEFAULT_ACTIONNAME = "default"; private static Configuration cachedConf; @@ -335,7 +343,7 @@ public class HadoopAccessorService implements Service { return HadoopAccessorService.class; } - private UserGroupInformation getUGI(String user) throws IOException { + UserGroupInformation getUGI(String user) throws IOException { return ugiService.getProxyUser(user); } @@ -650,71 +658,6 @@ public class HadoopAccessorService implements Service { } } - public Text getMRDelegationTokenRenewer(JobConf jobConf) throws IOException { - if (UserGroupInformation.isSecurityEnabled()) { // secure cluster - return getMRTokenRenewerInternal(jobConf); - } - else { - return MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer - } - } - - // Package private for unit test purposes - Text getMRTokenRenewerInternal(JobConf jobConf) throws IOException { - // Getting renewer correctly for JT principal also though JT in hadoop 1.x does not have - // support for renewing/cancelling tokens - final String servicePrincipal = getServicePrincipal(jobConf); - Text renewer; - if (servicePrincipal != null) { // secure cluster - renewer = mrTokenRenewers.get(servicePrincipal); - if (renewer == null) { - renewer = new Text(getServerPrincipal(jobConf, servicePrincipal)); - mrTokenRenewers.put(servicePrincipal, renewer); - } - } - else { - renewer = MR_TOKEN_ALIAS; //Doesn't matter what we pass as renewer - } - return renewer; - } - - public String getServicePrincipal(final Configuration configuration) { - return configuration.get(RM_PRINCIPAL, configuration.get(JT_PRINCIPAL)); - } - - /** - * Mimic {@link org.apache.hadoop.mapred.Master#getMasterPrincipal}, get Kerberos principal for use as delegation token renewer. - * - * @param configuration the {@link Configuration} containing the YARN RM address - * @param servicePrincipal the configured service principal - * @return the server principal originating from the host name and the service principal - * @throws IOException when something goes wrong finding out the local address inside - * {@link SecurityUtil#getServerPrincipal(String, String)} - */ - public String getServerPrincipal(final Configuration configuration, final String servicePrincipal) throws IOException { - Preconditions.checkNotNull(configuration, "configuration has to be filled"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(servicePrincipal), "servicePrincipal has to be filled"); - Preconditions.checkArgument(!Strings.isNullOrEmpty(configuration.get(HADOOP_YARN_RM)), - String.format("configuration entry %s has to be filled", HADOOP_YARN_RM)); - - String serverPrincipal; - final String target = configuration.get(HADOOP_YARN_RM); - - try { - final String addr = NetUtils.createSocketAddr(target).getHostName(); - serverPrincipal = SecurityUtil.getServerPrincipal(servicePrincipal, addr); - LOG.info("Delegation Token Renewer details: Principal={0},Target={1}", serverPrincipal, target); - } - catch (final IllegalArgumentException iae) { - LOG.warn("An error happened while trying to get server principal. Getting it from service principal anyway.", iae); - - serverPrincipal = servicePrincipal.split("[/@]")[0]; - LOG.info("Delegation Token Renewer for {0} is {1}", target, serverPrincipal); - } - - return serverPrincipal; - } - public void addFileToClassPath(String user, final Path file, final Configuration conf) throws IOException { ParamChecker.notEmpty(user, "user"); http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java new file mode 100644 index 0000000..17cdd9d --- /dev/null +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestHadoopTokenHelper.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.oozie.action.hadoop; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapred.JobConf; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class TestHadoopTokenHelper { + + @Test + public void testGetMRDelegationTokenRenewer() throws Exception { + HadoopTokenHelper hadoopTokenHelper = new HadoopTokenHelper(); + Configuration configuration = new Configuration(false); + configuration.set("yarn.resourcemanager.address", "localhost:8032"); + configuration.set("yarn.resourcemanager.principal", "rm/server....@kdc.domain.com"); + assertEquals("yarn setup","rm/server....@kdc.domain.com", + hadoopTokenHelper.getServerPrincipal(configuration)); + + configuration = new Configuration(false); + configuration.set("yarn.resourcemanager.address", "rm-ha-uri"); + configuration.set("yarn.resourcemanager.principal", "rm/server....@kdc.domain.com"); + assertEquals("yarn ha setup","rm", + hadoopTokenHelper.getServerPrincipal(configuration)); + } +} http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java index 02e60c0..0d1dd97 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestJavaActionExecutor.java @@ -1083,7 +1083,6 @@ public class TestJavaActionExecutor extends ActionExecutorTestCase { // Define 'abc' token type in oozie-site ConfigurationService.set("oozie.credentials.credentialclasses", "abc=org.apache.oozie.action.hadoop.InsertTestToken"); ConfigurationService.setBoolean("oozie.credentials.skip", skipSite); - // Setting the credential properties in launcher conf Map<String, CredentialsProperties> credProperties = ae.setCredentialPropertyToActionConf(context, action, actionConf); http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java index a7d6c18..de67365 100644 --- a/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java +++ b/core/src/test/java/org/apache/oozie/action/hadoop/TestShellMain.java @@ -48,7 +48,7 @@ public class TestShellMain extends ShellTestCase { jobConf.setInt("mapred.reduce.max.attempts", 1); jobConf.set("mapred.job.tracker", getJobTrackerUri()); jobConf.set("fs.default.name", getNameNodeUri()); - + jobConf.setLong(LauncherMain.OOZIE_JOB_LAUNCH_TIME, System.currentTimeMillis()); jobConf.set(ShellMain.CONF_OOZIE_SHELL_EXEC, SHELL_COMMAND_NAME); String[] args = new String[] { SHELL_COMMAND_SCRIPTFILE_OPTION, script.toString(), "A", "B" }; http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java index cf77f18..98a41a1 100644 --- a/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java +++ b/core/src/test/java/org/apache/oozie/command/wf/TestWorkflowActionKillXCommand.java @@ -180,7 +180,7 @@ public class TestWorkflowActionKillXCommand extends XDataTestCase { jobConf = sleepjob.setupJobConf(1, 1, sleep, 1, sleep, 1); jobConf.set(LauncherMain.CHILD_MAPREDUCE_JOB_TAGS, "sleepjob"); jobConf.set(LauncherMain.MAPREDUCE_JOB_TAGS, "sleepjob"); - System.setProperty(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis())); + jobConf.set(LauncherMain.OOZIE_JOB_LAUNCH_TIME, String.valueOf(System.currentTimeMillis())); jobClient.submitJob(new JobConf(jobConf)); Set<ApplicationId> apps = LauncherMain.getChildYarnJobs(jobConf, ApplicationsRequestScope.ALL); http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java index 960c2f9..89ce185 100644 --- a/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java +++ b/core/src/test/java/org/apache/oozie/service/TestHadoopAccessorService.java @@ -228,37 +228,6 @@ public class TestHadoopAccessorService extends XFsTestCase { } } - public void testGetMRDelegationTokenRenewer() throws Exception { - HadoopAccessorService has = Services.get().get(HadoopAccessorService.class); - JobConf jobConf = new JobConf(false); - assertEquals(new Text("oozie mr token"), has.getMRTokenRenewerInternal(jobConf)); - jobConf.set("yarn.resourcemanager.address", "localhost:50300"); - jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_h...@kdc.domain.com"); - assertEquals(new Text("mapred/localh...@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); - jobConf = new JobConf(false); - jobConf.set("mapreduce.jobtracker.address", "127.0.0.1:50300"); - jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_h...@kdc.domain.com"); - assertEquals(new Text("mapred/localh...@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); - jobConf = new JobConf(false); - jobConf.set("yarn.resourcemanager.address", "localhost:8032"); - jobConf.set("yarn.resourcemanager.principal", "rm/server....@kdc.domain.com"); - assertEquals(new Text("rm/server....@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); - - // Try the above with logical URIs (i.e. for HA) - jobConf = new JobConf(false); - jobConf.set("mapred.job.tracker", "jt-ha-uri"); - jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_h...@kdc.domain.com"); - assertEquals(new Text("mapred/localh...@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); - jobConf = new JobConf(false); - jobConf.set("mapreduce.jobtracker.address", "jt-ha-uri"); - jobConf.set("mapreduce.jobtracker.kerberos.principal", "mapred/_h...@kdc.domain.com"); - assertEquals(new Text("mapred/localh...@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); - jobConf = new JobConf(false); - jobConf.set("yarn.resourcemanager.address", "rm-ha-uri"); - jobConf.set("yarn.resourcemanager.principal", "rm/server....@kdc.domain.com"); - assertEquals(new Text("rm/server....@kdc.domain.com"), has.getMRTokenRenewerInternal(jobConf)); - } - public void testCheckSupportedFilesystem() throws Exception { Configuration hConf = Services.get().getConf(); http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/release-log.txt ---------------------------------------------------------------------- diff --git a/release-log.txt b/release-log.txt index 82621ca..bacd686 100644 --- a/release-log.txt +++ b/release-log.txt @@ -1,5 +1,6 @@ -- Oozie 5.0.0 release (trunk - unreleased) +OOZIE-2909 LauncherAM: rewrite UGI calls (gezapeti) OOZIE-2687 Create XML schema for launcher configurations (asasvari) OOZIE-3041 TestWorkflowActionRetryInfoXCommand fails in oozie core module (andras.piros via gezapeti) OOZIE-2916 Set a job name for the MR Action's child job (asasvari) http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java index b4cbb4b..bfb3d76 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/AMRMClientAsyncFactory.java @@ -22,11 +22,9 @@ import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; public class AMRMClientAsyncFactory { - public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs) { + public AMRMClientAsync<?> createAMRMClientAsync(int intervalMs, AMRMCallBackHandler callBackHandler) { AMRMClient<?> amRmClient = AMRMClient.createAMRMClient(); - AMRMCallBackHandler callBackHandler = new AMRMCallBackHandler(); AMRMClientAsync<?> amRmClientAsync = AMRMClientAsync.createAMRMClientAsync(amRmClient, intervalMs, callBackHandler); - return amRmClientAsync; } } http://git-wip-us.apache.org/repos/asf/oozie/blob/9cb4bd05/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java ---------------------------------------------------------------------- diff --git a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java index 874d371..fcb0a92 100644 --- a/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java +++ b/sharelib/oozie/src/main/java/org/apache/oozie/action/hadoop/HdfsOperations.java @@ -40,85 +40,59 @@ import com.google.common.base.Preconditions; public class HdfsOperations { private static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; private final SequenceFileWriterFactory seqFileWriterFactory; - private final UserGroupInformation ugi; - public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory, UserGroupInformation ugi) { + public HdfsOperations(SequenceFileWriterFactory seqFileWriterFactory) { this.seqFileWriterFactory = Preconditions.checkNotNull(seqFileWriterFactory, "seqFileWriterFactory should not be null"); - this.ugi = Preconditions.checkNotNull(ugi, "ugi should not be null"); } /** * Creates a Sequence file which contains the output from an action and uploads it to HDFS. */ public void uploadActionDataToHDFS(final Configuration launcherJobConf, final Path actionDir, - final Map<String, String> actionData) throws IOException, InterruptedException { - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE); - // upload into sequence file - System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri()); + final Map<String, String> actionData) throws IOException, InterruptedException { + Path finalPath = new Path(actionDir, LauncherAM.ACTION_DATA_SEQUENCE_FILE); + // upload into sequence file + System.out.println("Oozie Launcher, uploading action data to HDFS sequence file: " + finalPath.toUri()); - try (SequenceFile.Writer wr = - seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) { + try (SequenceFile.Writer wr = + seqFileWriterFactory.createSequenceFileWriter(launcherJobConf, finalPath, Text.class, Text.class)) { - if (wr != null) { - for (Entry<String, String> entry : actionData.entrySet()) { - wr.append(new Text(entry.getKey()), new Text(entry.getValue())); - } - } else { - throw new IOException("SequenceFile.Writer is null for " + finalPath); - } + if (wr != null) { + for (Entry<String, String> entry : actionData.entrySet()) { + wr.append(new Text(entry.getKey()), new Text(entry.getValue())); } - - return null; + } else { + throw new IOException("SequenceFile.Writer is null for " + finalPath); } - }); + } } public boolean fileExists(final Path path, final Configuration launcherJobConf) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<Boolean>() { - @Override - public Boolean run() throws Exception { - FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf); - return fs.exists(path); - } - }); + FileSystem fs = FileSystem.get(path.toUri(), launcherJobConf); + return fs.exists(path); } public void writeStringToFile(final Path path, final Configuration conf, final String contents) throws IOException, InterruptedException { - ugi.doAs(new PrivilegedExceptionAction<Void>() { - @Override - public Void run() throws Exception { - try (FileSystem fs = FileSystem.get(path.toUri(), conf); - java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) { - writer.write(contents); - } - - return null; - } - }); + try (FileSystem fs = FileSystem.get(path.toUri(), conf); + java.io.Writer writer = new OutputStreamWriter(fs.create(path), DEFAULT_CHARSET)) { + writer.write(contents); + } } public String readFileContents(final Path path, final Configuration conf) throws IOException, InterruptedException { - return ugi.doAs(new PrivilegedExceptionAction<String>() { - @Override - public String run() throws Exception { - StringBuilder sb = new StringBuilder(); - - try (FileSystem fs = FileSystem.get(path.toUri(), conf); - InputStream is = fs.open(path); - BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) { + StringBuilder sb = new StringBuilder(); - String contents; - while ((contents = reader.readLine()) != null) { - sb.append(contents); - } - } + try (FileSystem fs = FileSystem.get(path.toUri(), conf); + InputStream is = fs.open(path); + BufferedReader reader = new BufferedReader(new InputStreamReader(is, DEFAULT_CHARSET))) { - return sb.toString(); + String contents; + while ((contents = reader.readLine()) != null) { + sb.append(contents); } - }); + } + + return sb.toString(); } }