Modified: 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java?rev=1429097&r1=1429096&r2=1429097&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
 (original)
+++ 
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/security/TestJHSSecurity.java
 Fri Jan  4 20:28:15 2013
@@ -18,19 +18,29 @@
 
 package org.apache.hadoop.mapreduce.security;
 
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import java.io.IOException;
+import java.lang.reflect.UndeclaredThrowableException;
+import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 
 import junit.framework.Assert;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapreduce.v2.api.HSClientProtocol;
 import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol;
+import 
org.apache.hadoop.mapreduce.v2.api.protocolrecords.CancelDelegationTokenRequest;
 import 
org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetDelegationTokenRequest;
 import org.apache.hadoop.mapreduce.v2.api.protocolrecords.GetJobReportRequest;
+import 
org.apache.hadoop.mapreduce.v2.api.protocolrecords.RenewDelegationTokenRequest;
+import org.apache.hadoop.mapreduce.v2.hs.JHSDelegationTokenSecretManager;
 import org.apache.hadoop.mapreduce.v2.hs.JobHistoryServer;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -49,6 +59,8 @@ import org.junit.Test;
 
 public class TestJHSSecurity {
 
+  private static final Log LOG = LogFactory.getLog(TestJHSSecurity.class);
+  
   @Test
   public void testDelegationToken() throws IOException, InterruptedException {
 
@@ -63,55 +75,211 @@ public class TestJHSSecurity {
     conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
       "kerberos");
     UserGroupInformation.setConfiguration(conf);
-
-    final JobHistoryServer jobHistoryServer = new JobHistoryServer() {
-      protected void doSecureLogin(Configuration conf) throws IOException {
-        // no keytab based login
+    
+    final long initialInterval = 10000l;
+    final long maxLifetime= 20000l;
+    final long renewInterval = 10000l;
+
+    JobHistoryServer jobHistoryServer = null;
+    MRClientProtocol clientUsingDT = null;
+    long tokenFetchTime;
+    try {
+      jobHistoryServer = new JobHistoryServer() {
+        protected void doSecureLogin(Configuration conf) throws IOException {
+          // no keytab based login
+        };
+
+        protected JHSDelegationTokenSecretManager createJHSSecretManager(
+            Configuration conf) {
+          return new JHSDelegationTokenSecretManager(initialInterval, 
+              maxLifetime, renewInterval, 3600000);
+        }
       };
-    };
-    jobHistoryServer.init(conf);
-    jobHistoryServer.start();
-
-    // Fake the authentication-method
-    UserGroupInformation loggedInUser = UserGroupInformation.getCurrentUser();
-    loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+//      final JobHistoryServer jobHistoryServer = jhServer;
+      jobHistoryServer.init(conf);
+      jobHistoryServer.start();
+      final MRClientProtocol hsService = jobHistoryServer.getClientService()
+          .getClientHandler();
+
+      // Fake the authentication-method
+      UserGroupInformation loggedInUser = UserGroupInformation
+          .createRemoteUser("testrene...@apache.org");
+      Assert.assertEquals("testrenewer", loggedInUser.getShortUserName());
+      // Default realm is APACHE.ORG
+      loggedInUser.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+
+
+      DelegationToken token =
+          getDelegationToken(loggedInUser, hsService,
+              loggedInUser.getShortUserName());
+      tokenFetchTime = System.currentTimeMillis();
+      LOG.info("Got delegation token at: " + tokenFetchTime);
+
+      // Now try talking to JHS using the delegation token
+      clientUsingDT = getMRClientProtocol(token, jobHistoryServer
+          .getClientService().getBindAddress(), "TheDarkLord", conf);
+
+      GetJobReportRequest jobReportRequest =
+          Records.newRecord(GetJobReportRequest.class);
+      jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
+      try {
+        clientUsingDT.getJobReport(jobReportRequest);
+      } catch (YarnRemoteException e) {
+        Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
+      }
+      
+   // Renew after 50% of token age.
+      while(System.currentTimeMillis() < tokenFetchTime + initialInterval / 2) 
{
+        Thread.sleep(500l);
+      }
+      long nextExpTime = renewDelegationToken(loggedInUser, hsService, token);
+      long renewalTime = System.currentTimeMillis();
+      LOG.info("Renewed token at: " + renewalTime + ", NextExpiryTime: "
+          + nextExpTime);
+      
+      // Wait for first expiry, but before renewed expiry.
+      while (System.currentTimeMillis() > tokenFetchTime + initialInterval
+          && System.currentTimeMillis() < nextExpTime) {
+        Thread.sleep(500l);
+      }
+      Thread.sleep(50l);
+      
+      // Valid token because of renewal.
+      try {
+        clientUsingDT.getJobReport(jobReportRequest);
+      } catch (YarnRemoteException e) {
+        Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
+      }
+      
+      // Wait for expiry.
+      while(System.currentTimeMillis() < renewalTime + renewInterval) {
+        Thread.sleep(500l);
+      }
+      Thread.sleep(50l);
+      LOG.info("At time: " + System.currentTimeMillis() + ", token should be 
invalid");
+      // Token should have expired.      
+      try {
+        clientUsingDT.getJobReport(jobReportRequest);
+        fail("Should not have succeeded with an expired token");
+      } catch (YarnRemoteException e) {
+        assertTrue(e.getMessage().contains("is expired"));
+      } catch (UndeclaredThrowableException ute) {
+        assertTrue(ute.getCause().getMessage().contains("is expired"));
+      }
+      
+      // Test cancellation
+      // Stop the existing proxy, start another.
+      if (clientUsingDT != null) {
+//        RPC.stopProxy(clientUsingDT);
+        clientUsingDT = null;
+      }
+      token =
+          getDelegationToken(loggedInUser, hsService,
+              loggedInUser.getShortUserName());
+      tokenFetchTime = System.currentTimeMillis();
+      LOG.info("Got delegation token at: " + tokenFetchTime);
+ 
+      // Now try talking to HSService using the delegation token
+      clientUsingDT = getMRClientProtocol(token, jobHistoryServer
+          .getClientService().getBindAddress(), "loginuser2", conf);
+
+      
+      try {
+        clientUsingDT.getJobReport(jobReportRequest);
+      } catch (YarnRemoteException e) {
+        Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
+      }
+      cancelDelegationToken(loggedInUser, hsService, token);
+      if (clientUsingDT != null) {
+//        RPC.stopProxy(clientUsingDT);
+        clientUsingDT = null;
+      } 
+      
+      // Creating a new connection.
+      clientUsingDT = getMRClientProtocol(token, jobHistoryServer
+          .getClientService().getBindAddress(), "loginuser2", conf);
+      LOG.info("Cancelled delegation token at: " + System.currentTimeMillis());
+      // Verify cancellation worked.
+      try {
+        clientUsingDT.getJobReport(jobReportRequest);
+        fail("Should not have succeeded with a cancelled delegation token");
+      } catch (YarnRemoteException e) {
+      } catch (UndeclaredThrowableException ute) {
+      }
+    } finally {
+      jobHistoryServer.stop();
+    }
+  }
 
+  private DelegationToken getDelegationToken(
+      final UserGroupInformation loggedInUser,
+      final MRClientProtocol hsService, final String renewerString)
+      throws IOException, InterruptedException {
     // Get the delegation token directly as it is a little difficult to setup
     // the kerberos based rpc.
-    DelegationToken token =
-        loggedInUser.doAs(new PrivilegedExceptionAction<DelegationToken>() {
+    DelegationToken token = loggedInUser
+        .doAs(new PrivilegedExceptionAction<DelegationToken>() {
+
           @Override
           public DelegationToken run() throws YarnRemoteException {
-            GetDelegationTokenRequest request =
-                Records.newRecord(GetDelegationTokenRequest.class);
-            request.setRenewer("OneRenewerToRuleThemAll");
-            return jobHistoryServer.getClientService().getClientHandler()
-              .getDelegationToken(request).getDelegationToken();
+            GetDelegationTokenRequest request = Records
+                .newRecord(GetDelegationTokenRequest.class);
+            request.setRenewer(renewerString);
+            return hsService.getDelegationToken(request).getDelegationToken();
           }
+
         });
+    return token;
+  }
+
+  private long renewDelegationToken(final UserGroupInformation loggedInUser,
+      final MRClientProtocol hsService, final DelegationToken dToken)
+      throws IOException, InterruptedException {
+    long nextExpTime = loggedInUser.doAs(new PrivilegedExceptionAction<Long>() 
{
+
+      @Override
+      public Long run() throws YarnRemoteException {
+        RenewDelegationTokenRequest request = Records
+            .newRecord(RenewDelegationTokenRequest.class);
+        request.setDelegationToken(dToken);
+        return hsService.renewDelegationToken(request).getNextExpirationTime();
+      }
+    });
+    return nextExpTime;
+  }
+
+  private void cancelDelegationToken(final UserGroupInformation loggedInUser,
+      final MRClientProtocol hsService, final DelegationToken dToken)
+      throws IOException, InterruptedException {
+
+    loggedInUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws YarnRemoteException {
+        CancelDelegationTokenRequest request = Records
+            .newRecord(CancelDelegationTokenRequest.class);
+        request.setDelegationToken(dToken);
+        hsService.cancelDelegationToken(request);
+        return null;
+      }
+    });
+  }
+
+  private MRClientProtocol getMRClientProtocol(DelegationToken token,
+      final InetSocketAddress hsAddress, String user, final Configuration 
conf) {
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser(user);
+    ugi.addToken(ProtoUtils.convertFromProtoFormat(token, hsAddress));
 
-    // Now try talking to JHS using the delegation token
-    UserGroupInformation ugi =
-        UserGroupInformation.createRemoteUser("TheDarkLord");
-    ugi.addToken(ProtoUtils.convertFromProtoFormat(
-        token, jobHistoryServer.getClientService().getBindAddress()));
     final YarnRPC rpc = YarnRPC.create(conf);
-    MRClientProtocol userUsingDT =
-        ugi.doAs(new PrivilegedAction<MRClientProtocol>() {
+    MRClientProtocol hsWithDT = ugi
+        .doAs(new PrivilegedAction<MRClientProtocol>() {
+
           @Override
           public MRClientProtocol run() {
             return (MRClientProtocol) rpc.getProxy(HSClientProtocol.class,
-              jobHistoryServer.getClientService().getBindAddress(), conf);
+                hsAddress, conf);
           }
         });
-    GetJobReportRequest jobReportRequest =
-        Records.newRecord(GetJobReportRequest.class);
-    jobReportRequest.setJobId(MRBuilderUtils.newJobId(123456, 1, 1));
-    try {
-      userUsingDT.getJobReport(jobReportRequest);
-    } catch (YarnRemoteException e) {
-      Assert.assertEquals("Unknown job job_123456_0001", e.getMessage());
-    }
+    return hsWithDT;
   }
 
 }


Reply via email to