Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1429097&r1=1429096&r2=1429097&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java Fri Jan 4 20:28:15 2013 @@ -18,19 +18,29 @@ package org.apache.hadoop.mapreduce.security; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + import java.io.IOException; +import java.lang.reflect.UndeclaredThrowableException; +import java.net.InetSocketAddress; import java.security.PrivilegedAction; import java.security.PrivilegedExceptionAction; import junit.framework.Assert; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol; import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest; import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest; +import org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest; +import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager; import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -49,6 +59,8 @@ import org.junit.Test; public class TestJHSSecurity { + private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class); + @Test public void testDelegationToken() throws IOException, InterruptedException { @@ -63,55 +75,211 @@ public class TestJHSSecurity { conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION, "kerberos"); UserGroupInformation.setConfiguration(conf); - - final JobHistoryServer jobHistoryServer = new JobHistoryServer() { - protected void doSecureLogin(Configuration conf) throws IOException { - // no keytab based login + + final long initialInterval = 10000l; + final long maxLifetime= 20000l; + final long renewInterval = 10000l; + + JobHistoryServer jobHistoryServer = null; + MRClientProtocol clientUsingDT = null; + long tokenFetchTime; + try { + jobHistoryServer = new JobHistoryServer() { + protected void doSecureLogin(Configuration conf) throws IOException { + // no keytab based login + }; + + protected JHSDelegationTokenSecretManager createJHSSecretManager( + Configuration conf) { + return new JHSDelegationTokenSecretManager(initialInterval, + maxLifetime, renewInterval, 3600000); + } }; - }; - jobHistoryServer.init(conf); - jobHistoryServer.start(); - - // Fake the authentication-method - UserGroupInformation loggedInUser = UserGroupInformation.getCurrentUser(); - loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS); +// final JobHistoryServer jobHistoryServer = jhServer; + jobHistoryServer.init(conf); + jobHistoryServer.start(); + final MRClientProtocol hsService = jobHistoryServer.getClientService() + .getClientHandler(); + + // Fake the authentication-method + UserGroupInformation loggedInUser = UserGroupInformation + .createRemoteUser("testrene...@apache.org"); + Assert.assertEquals("testrenewer", loggedInUser.getShortUserName()); + // Default realm is APACHE.ORG + loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS); + + + DelegationToken token = + getDelegationToken(loggedInUser, hsService, + loggedInUser.getShortUserName()); + tokenFetchTime = System.currentTimeMillis(); + LOG.info("Got delegation token at: " + tokenFetchTime); + + // Now try talking to JHS using the delegation token + clientUsingDT = getMRClientProtocol(token, jobHistoryServer + .getClientService().getBindAddress(), "TheDarkLord", conf); + + GetJobReportRequest jobReportRequest = + Records.newRecord(GetJobReportRequest.class); + jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1)); + try { + clientUsingDT.getJobReport(jobReportRequest); + } catch (YarnRemoteException e) { + Assert.assertEquals("Unknown job job_123456_0001", e.getMessage()); + } + + // Renew after 50% of token age. + while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) { + Thread.sleep(500l); + } + long nextExpTime = renewDelegationToken(loggedInUser, hsService, token); + long renewalTime = System.currentTimeMillis(); + LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: " + + nextExpTime); + + // Wait for first expiry, but before renewed expiry. + while (System.currentTimeMillis() > tokenFetchTime + initialInterval + && System.currentTimeMillis() < nextExpTime) { + Thread.sleep(500l); + } + Thread.sleep(50l); + + // Valid token because of renewal. + try { + clientUsingDT.getJobReport(jobReportRequest); + } catch (YarnRemoteException e) { + Assert.assertEquals("Unknown job job_123456_0001", e.getMessage()); + } + + // Wait for expiry. + while(System.currentTimeMillis() < renewalTime + renewInterval) { + Thread.sleep(500l); + } + Thread.sleep(50l); + LOG.info("At time: " + System.currentTimeMillis() + ", token should be invalid"); + // Token should have expired. + try { + clientUsingDT.getJobReport(jobReportRequest); + fail("Should not have succeeded with an expired token"); + } catch (YarnRemoteException e) { + assertTrue(e.getMessage().contains("is expired")); + } catch (UndeclaredThrowableException ute) { + assertTrue(ute.getCause().getMessage().contains("is expired")); + } + + // Test cancellation + // Stop the existing proxy, start another. + if (clientUsingDT != null) { +// RPC.stopProxy(clientUsingDT); + clientUsingDT = null; + } + token = + getDelegationToken(loggedInUser, hsService, + loggedInUser.getShortUserName()); + tokenFetchTime = System.currentTimeMillis(); + LOG.info("Got delegation token at: " + tokenFetchTime); + + // Now try talking to HSService using the delegation token + clientUsingDT = getMRClientProtocol(token, jobHistoryServer + .getClientService().getBindAddress(), "loginuser2", conf); + + + try { + clientUsingDT.getJobReport(jobReportRequest); + } catch (YarnRemoteException e) { + Assert.assertEquals("Unknown job job_123456_0001", e.getMessage()); + } + cancelDelegationToken(loggedInUser, hsService, token); + if (clientUsingDT != null) { +// RPC.stopProxy(clientUsingDT); + clientUsingDT = null; + } + + // Creating a new connection. + clientUsingDT = getMRClientProtocol(token, jobHistoryServer + .getClientService().getBindAddress(), "loginuser2", conf); + LOG.info("Cancelled delegation token at: " + System.currentTimeMillis()); + // Verify cancellation worked. + try { + clientUsingDT.getJobReport(jobReportRequest); + fail("Should not have succeeded with a cancelled delegation token"); + } catch (YarnRemoteException e) { + } catch (UndeclaredThrowableException ute) { + } + } finally { + jobHistoryServer.stop(); + } + } + private DelegationToken getDelegationToken( + final UserGroupInformation loggedInUser, + final MRClientProtocol hsService, final String renewerString) + throws IOException, InterruptedException { // Get the delegation token directly as it is a little difficult to setup // the kerberos based rpc. - DelegationToken token = - loggedInUser.doAs(new PrivilegedExceptionAction<DelegationToken>() { + DelegationToken token = loggedInUser + .doAs(new PrivilegedExceptionAction<DelegationToken>() { + @Override public DelegationToken run() throws YarnRemoteException { - GetDelegationTokenRequest request = - Records.newRecord(GetDelegationTokenRequest.class); - request.setRenewer("OneRenewerToRuleThemAll"); - return jobHistoryServer.getClientService().getClientHandler() - .getDelegationToken(request).getDelegationToken(); + GetDelegationTokenRequest request = Records + .newRecord(GetDelegationTokenRequest.class); + request.setRenewer(renewerString); + return hsService.getDelegationToken(request).getDelegationToken(); } + }); + return token; + } + + private long renewDelegationToken(final UserGroupInformation loggedInUser, + final MRClientProtocol hsService, final DelegationToken dToken) + throws IOException, InterruptedException { + long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() { + + @Override + public Long run() throws YarnRemoteException { + RenewDelegationTokenRequest request = Records + .newRecord(RenewDelegationTokenRequest.class); + request.setDelegationToken(dToken); + return hsService.renewDelegationToken(request).getNextExpirationTime(); + } + }); + return nextExpTime; + } + + private void cancelDelegationToken(final UserGroupInformation loggedInUser, + final MRClientProtocol hsService, final DelegationToken dToken) + throws IOException, InterruptedException { + + loggedInUser.doAs(new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws YarnRemoteException { + CancelDelegationTokenRequest request = Records + .newRecord(CancelDelegationTokenRequest.class); + request.setDelegationToken(dToken); + hsService.cancelDelegationToken(request); + return null; + } + }); + } + + private MRClientProtocol getMRClientProtocol(DelegationToken token, + final InetSocketAddress hsAddress, String user, final Configuration conf) { + UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user); + ugi.addToken(ProtoUtils.convertFromProtoFormat(token, hsAddress)); - // Now try talking to JHS using the delegation token - UserGroupInformation ugi = - UserGroupInformation.createRemoteUser("TheDarkLord"); - ugi.addToken(ProtoUtils.convertFromProtoFormat( - token, jobHistoryServer.getClientService().getBindAddress())); final YarnRPC rpc = YarnRPC.create(conf); - MRClientProtocol userUsingDT = - ugi.doAs(new PrivilegedAction<MRClientProtocol>() { + MRClientProtocol hsWithDT = ugi + .doAs(new PrivilegedAction<MRClientProtocol>() { + @Override public MRClientProtocol run() { return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class, - jobHistoryServer.getClientService().getBindAddress(), conf); + hsAddress, conf); } }); - GetJobReportRequest jobReportRequest = - Records.newRecord(GetJobReportRequest.class); - jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1)); - try { - userUsingDT.getJobReport(jobReportRequest); - } catch (YarnRemoteException e) { - Assert.assertEquals("Unknown job job_123456_0001", e.getMessage()); - } + return hsWithDT; } }