Author: cos Date: Wed Apr 3 01:39:42 2013 New Revision: 1463804 URL: http://svn.apache.org/r1463804 Log: MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is submitting a job (daryn)
Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt?rev=1463804&r1=1463803&r2=1463804&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt (original) +++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/CHANGES.txt Wed Apr 3 01:39:42 2013 @@ -12,6 +12,9 @@ Release 2.0.4-beta - UNRELEASED BUG FIXES + MAPREDUCE-5088. MR Client gets an renewer token exception while Oozie is + submitting a job (Daryn Sharp via cos) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java?rev=1463804&r1=1463803&r2=1463804&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java (original) +++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/JobClient.java Wed Apr 3 01:39:42 2013 @@ -138,15 +138,6 @@ import org.apache.hadoop.util.ToolRunner public class JobClient extends CLI { public static enum TaskStatusFilter { NONE, KILLED, FAILED, SUCCEEDED, ALL } private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED; - /* notes that get delegation token was called. Again this is hack for oozie - * to make sure we add history server delegation tokens to the credentials - * for the job. Since the api only allows one delegation token to be returned, - * we have to add this hack. - */ - private boolean getDelegationTokenCalled = false; - /* do we need a HS delegation token for this client */ - static final String HS_DELEGATION_TOKEN_REQUIRED - = "mapreduce.history.server.delegationtoken.required"; static{ ConfigUtil.loadResources(); @@ -569,10 +560,6 @@ public class JobClient extends CLI { try { conf.setBooleanIfUnset("mapred.mapper.new-api", false); conf.setBooleanIfUnset("mapred.reducer.new-api", false); - if (getDelegationTokenCalled) { - conf.setBoolean(HS_DELEGATION_TOKEN_REQUIRED, getDelegationTokenCalled); - getDelegationTokenCalled = false; - } Job job = clientUgi.doAs(new PrivilegedExceptionAction<Job> () { @Override public Job run() throws IOException, ClassNotFoundException, @@ -1171,7 +1158,6 @@ public class JobClient extends CLI { */ public Token<DelegationTokenIdentifier> getDelegationToken(final Text renewer) throws IOException, InterruptedException { - getDelegationTokenCalled = true; return clientUgi.doAs(new PrivilegedExceptionAction<Token<DelegationTokenIdentifier>>() { public Token<DelegationTokenIdentifier> run() throws IOException, Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java?rev=1463804&r1=1463803&r2=1463804&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java (original) +++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/ResourceMgrDelegate.java Wed Apr 3 01:39:42 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.mapred; import java.io.IOException; +import java.net.InetSocketAddress; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -87,6 +88,10 @@ public class ResourceMgrDelegate extends return oldMetrics; } + InetSocketAddress getConnectAddress() { + return rmAddress; + } + @SuppressWarnings("rawtypes") public Token getDelegationToken(Text renewer) throws IOException, InterruptedException { Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java?rev=1463804&r1=1463803&r2=1463804&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java (original) +++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java Wed Apr 3 01:39:42 2013 @@ -60,6 +60,7 @@ import org.apache.hadoop.mapreduce.v2.ap import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; import org.apache.hadoop.mapreduce.v2.util.MRApps; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.security.token.Token; @@ -81,6 +82,7 @@ import org.apache.hadoop.yarn.api.record import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMTokenSelector; import org.apache.hadoop.yarn.util.BuilderUtils; import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.ProtoUtils; @@ -90,7 +92,7 @@ import com.google.common.annotations.Vis /** * This class enables the current JobClient (0.22 hadoop) to run on YARN. */ -@SuppressWarnings({ "rawtypes", "unchecked" }) +@SuppressWarnings("unchecked") public class YARNRunner implements ClientProtocol { private static final Log LOG = LogFactory.getLog(YARNRunner.class); @@ -101,14 +103,6 @@ public class YARNRunner implements Clien private Configuration conf; private final FileContext defaultFileContext; - /* usually is false unless the jobclient get delegation token is - * called. This is a hack wherein we do return a token from RM - * on getDelegationtoken but due to the restricted api on jobclient - * we just add a job history DT token when submitting a job. - */ - private static final boolean DEFAULT_HS_DELEGATION_TOKEN_REQUIRED = - false; - /** * Yarn runner incapsulates the client interface of * yarn @@ -186,6 +180,28 @@ public class YARNRunner implements Clien } @VisibleForTesting + void addHistoyToken(Credentials ts) throws IOException, InterruptedException { + /* check if we have a hsproxy, if not, no need */ + MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); + if (UserGroupInformation.isSecurityEnabled() && (hsProxy != null)) { + /* + * note that get delegation token was called. Again this is hack for oozie + * to make sure we add history server delegation tokens to the credentials + */ + RMTokenSelector tokenSelector = new RMTokenSelector(); + Text service = SecurityUtil.buildTokenService(resMgrDelegate + .getConnectAddress()); + if (tokenSelector.selectToken(service, ts.getAllTokens()) != null) { + Text hsService = SecurityUtil.buildTokenService(hsProxy + .getConnectAddress()); + if (ts.getToken(hsService) == null) { + ts.addToken(hsService, getDelegationTokenFromHS(hsProxy)); + } + } + } + } + + @VisibleForTesting Token<?> getDelegationTokenFromHS(MRClientProtocol hsProxy) throws IOException, InterruptedException { GetDelegationTokenRequest request = recordFactory @@ -263,18 +279,8 @@ public class YARNRunner implements Clien public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) throws IOException, InterruptedException { - /* check if we have a hsproxy, if not, no need */ - MRClientProtocol hsProxy = clientCache.getInitializedHSProxy(); - if (hsProxy != null) { - // JobClient will set this flag if getDelegationToken is called, if so, get - // the delegation tokens for the HistoryServer also. - if (conf.getBoolean(JobClient.HS_DELEGATION_TOKEN_REQUIRED, - DEFAULT_HS_DELEGATION_TOKEN_REQUIRED)) { - Token hsDT = getDelegationTokenFromHS(hsProxy); - ts.addToken(hsDT.getService(), hsDT); - } - } - + addHistoyToken(ts); + // Upload only in security mode: TODO Path applicationTokensFile = new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); Modified: hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java?rev=1463804&r1=1463803&r2=1463804&view=diff ============================================================================== --- hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java (original) +++ hadoop/common/branches/branch-2.0.4-alpha/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapred/TestYARNRunner.java Wed Apr 3 01:39:42 2013 @@ -20,8 +20,10 @@ package org.apache.hadoop.mapred; import static org.mockito.Matchers.any; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -30,6 +32,7 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; +import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.List; @@ -39,28 +42,24 @@ import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.FileContext; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.mapred.ClientCache; -import org.apache.hadoop.mapred.ClientServiceDelegate; -import org.apache.hadoop.mapred.JobConf; -import org.apache.hadoop.mapred.Master; -import org.apache.hadoop.mapred.ResourceMgrDelegate; -import org.apache.hadoop.mapred.YARNRunner; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.JobID; import org.apache.hadoop.mapreduce.JobPriority; import org.apache.hadoop.mapreduce.JobStatus.State; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.TypeConverter; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.MRDelegationTokenIdentifier; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenResponse; import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.ClientRMProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; -import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; -import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetAllApplicationsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; @@ -69,21 +68,27 @@ import org.apache.hadoop.yarn.api.protoc import org.apache.hadoop.yarn.api.protocolrecords.GetClusterMetricsResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetClusterNodesResponse; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoRequest; import org.apache.hadoop.yarn.api.protocolrecords.GetQueueUserAclsInfoResponse; +import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationReport; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.DelegationToken; import org.apache.hadoop.yarn.api.records.QueueInfo; -import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.api.records.YarnClusterMetrics; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.Records; import org.apache.log4j.Appender; import org.apache.log4j.Layout; import org.apache.log4j.Logger; @@ -146,7 +151,7 @@ public class TestYARNRunner extends Test } - @Test + @Test(timeout=20000) public void testJobKill() throws Exception { clientDelegate = mock(ClientServiceDelegate.class); when(clientDelegate.getJobStatus(any(JobID.class))).thenReturn(new @@ -171,7 +176,7 @@ public class TestYARNRunner extends Test verify(clientDelegate).killJob(jobId); } - @Test + @Test(timeout=20000) public void testJobSubmissionFailure() throws Exception { when(resourceMgrDelegate.submitApplication(any(ApplicationSubmissionContext.class))). thenReturn(appId); @@ -193,7 +198,7 @@ public class TestYARNRunner extends Test } } - @Test + @Test(timeout=20000) public void testResourceMgrDelegate() throws Exception { /* we not want a mock of resource mgr delegate */ final ClientRMProtocol clientRMProtocol = mock(ClientRMProtocol.class); @@ -259,8 +264,88 @@ public class TestYARNRunner extends Test delegate.getQueueAclsForCurrentUser(); verify(clientRMProtocol).getQueueUserAcls(any(GetQueueUserAclsInfoRequest.class)); } - - @Test + + @Test(timeout=20000) + public void testGetHSDelegationToken() throws Exception { + try { + Configuration conf = new Configuration(); + + // Setup mock service + InetSocketAddress mockRmAddress = new InetSocketAddress("localhost", 4444); + Text rmTokenSevice = SecurityUtil.buildTokenService(mockRmAddress); + + InetSocketAddress mockHsAddress = new InetSocketAddress("localhost", 9200); + Text hsTokenSevice = SecurityUtil.buildTokenService(mockHsAddress); + + // Setup mock rm token + RMDelegationTokenIdentifier tokenIdentifier = new RMDelegationTokenIdentifier( + new Text("owner"), new Text("renewer"), new Text("real")); + Token<RMDelegationTokenIdentifier> token = new Token<RMDelegationTokenIdentifier>( + new byte[0], new byte[0], tokenIdentifier.getKind(), rmTokenSevice); + token.setKind(RMDelegationTokenIdentifier.KIND_NAME); + + // Setup mock history token + DelegationToken historyToken = BuilderUtils.newDelegationToken( + new byte[0], MRDelegationTokenIdentifier.KIND_NAME.toString(), + new byte[0], hsTokenSevice.toString()); + GetDelegationTokenResponse getDtResponse = Records + .newRecord(GetDelegationTokenResponse.class); + getDtResponse.setDelegationToken(historyToken); + + // mock services + MRClientProtocol mockHsProxy = mock(MRClientProtocol.class); + doReturn(mockHsAddress).when(mockHsProxy).getConnectAddress(); + doReturn(getDtResponse).when(mockHsProxy).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + ResourceMgrDelegate rmDelegate = mock(ResourceMgrDelegate.class); + doReturn(mockRmAddress).when(rmDelegate).getConnectAddress(); + + ClientCache clientCache = mock(ClientCache.class); + doReturn(mockHsProxy).when(clientCache).getInitializedHSProxy(); + + Credentials creds = new Credentials(); + + YARNRunner yarnRunner = new YARNRunner(conf, rmDelegate, clientCache); + + // No HS token if no RM token + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // No HS token if RM token, but secirity disabled. + creds.addToken(new Text("rmdt"), token); + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + conf.set(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION, + "kerberos"); + UserGroupInformation.setConfiguration(conf); + creds = new Credentials(); + + // No HS token if no RM token, security enabled + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(0)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // HS token if RM token present, security enabled + creds.addToken(new Text("rmdt"), token); + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(1)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + + // No additional call to get HS token if RM and HS token present + yarnRunner.addHistoyToken(creds); + verify(mockHsProxy, times(1)).getDelegationToken( + any(GetDelegationTokenRequest.class)); + } finally { + // Back to defaults. + UserGroupInformation.setConfiguration(new Configuration()); + } + } + + @Test(timeout=20000) public void testHistoryServerToken() throws Exception { //Set the master principal in the config conf.set(YarnConfiguration.RM_PRINCIPAL,"foo@LOCAL"); @@ -303,7 +388,7 @@ public class TestYARNRunner extends Test }); } - @Test + @Test(timeout=20000) public void testAMAdminCommandOpts() throws Exception { JobConf jobConf = new JobConf(); @@ -366,7 +451,7 @@ public class TestYARNRunner extends Test assertTrue("AM admin command opts is after user command opts.", adminIndex < userIndex); } } - @Test + @Test(timeout=20000) public void testWarnCommandOpts() throws Exception { Logger logger = Logger.getLogger(YARNRunner.class);