Author: cnauroth
Date: Fri Jun 20 23:58:23 2014
New Revision: 1604300

URL: http://svn.apache.org/r1604300
Log:
HDFS-6222. Remove background token renewer from webhdfs. Contributed by Rushabh 
Shah and Daryn Sharp.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1604300&r1=1604299&r2=1604300&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Jun 20 
23:58:23 2014
@@ -684,6 +684,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6535. HDFS quota update is wrong when file is appended. (George Wong
     via jing9)
 
+    HDFS-6222. Remove background token renewer from webhdfs.
+    (Rushabh Shah and Daryn Sharp via cnauroth)
+
   BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS
 
     HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via 
umamahesh)

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java?rev=1604300&r1=1604299&r2=1604300&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenIdentifier.java
 Fri Jun 20 23:58:23 2014
@@ -23,6 +23,8 @@ import java.io.DataInputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.web.SWebHdfsFileSystem;
+import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.token.Token;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
@@ -75,4 +77,25 @@ public class DelegationTokenIdentifier 
       return ident.toString();
     }
   }
+  
+  public static class WebHdfsDelegationTokenIdentifier
+      extends DelegationTokenIdentifier {
+    public WebHdfsDelegationTokenIdentifier() {
+      super();
+    }
+    @Override
+    public Text getKind() {
+      return WebHdfsFileSystem.TOKEN_KIND;
+    }
+  }
+  
+  public static class SWebHdfsDelegationTokenIdentifier extends 
WebHdfsDelegationTokenIdentifier {
+       public SWebHdfsDelegationTokenIdentifier() {
+         super();
+       }
+       @Override
+       public Text getKind() {
+         return SWebHdfsFileSystem.TOKEN_KIND;
+       }
+  }
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java?rev=1604300&r1=1604299&r2=1604300&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
 Fri Jun 20 23:58:23 2014
@@ -36,8 +36,8 @@ public class SWebHdfsFileSystem extends 
   }
 
   @Override
-  protected synchronized void initializeTokenAspect() {
-    tokenAspect = new TokenAspect<SWebHdfsFileSystem>(this, tokenServiceName, 
TOKEN_KIND);
+  protected Text getTokenKind() {
+    return TOKEN_KIND;
   }
 
   @Override

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1604300&r1=1604299&r2=1604300&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
 Fri Jun 20 23:58:23 2014
@@ -69,11 +69,14 @@ import org.apache.hadoop.io.retry.RetryP
 import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import org.apache.hadoop.util.Progressable;
 import org.mortbay.util.ajax.JSON;
 
@@ -98,7 +101,7 @@ public class WebHdfsFileSystem extends F
 
   /** Delegation token kind */
   public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
-  protected TokenAspect<? extends WebHdfsFileSystem> tokenAspect;
+  private boolean canRefreshDelegationToken;
 
   private UserGroupInformation ugi;
   private URI uri;
@@ -127,13 +130,8 @@ public class WebHdfsFileSystem extends F
     return "http";
   }
 
-  /**
-   * Initialize tokenAspect. This function is intended to
-   * be overridden by SWebHdfsFileSystem.
-   */
-  protected synchronized void initializeTokenAspect() {
-    tokenAspect = new TokenAspect<WebHdfsFileSystem>(this, tokenServiceName,
-        TOKEN_KIND);
+  protected Text getTokenKind() {
+    return TOKEN_KIND;
   }
 
   @Override
@@ -162,7 +160,6 @@ public class WebHdfsFileSystem extends F
     this.tokenServiceName = isLogicalUri ?
         HAUtil.buildTokenServiceForLogicalUri(uri)
         : SecurityUtil.buildTokenService(getCanonicalUri());
-    initializeTokenAspect();
 
     if (!isHA) {
       this.retryPolicy =
@@ -195,10 +192,8 @@ public class WebHdfsFileSystem extends F
     }
 
     this.workingDir = getHomeDirectory();
-
-    if (UserGroupInformation.isSecurityEnabled()) {
-      tokenAspect.initDelegationToken(ugi);
-    }
+    this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
+    this.delegationToken = null;
   }
 
   @Override
@@ -213,11 +208,46 @@ public class WebHdfsFileSystem extends F
     return b;
   }
 
+  TokenSelector<DelegationTokenIdentifier> tokenSelector =
+      new 
AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
+
+  // the first getAuthParams() for a non-token op will either get the
+  // internal token from the ugi or lazy fetch one
   protected synchronized Token<?> getDelegationToken() throws IOException {
-    tokenAspect.ensureTokenInitialized();
+    if (canRefreshDelegationToken && delegationToken == null) {
+      Token<?> token = tokenSelector.selectToken(
+          new Text(getCanonicalServiceName()), ugi.getTokens());
+      // ugi tokens are usually indicative of a task which can't
+      // refetch tokens.  even if ugi has credentials, don't attempt
+      // to get another token to match hdfs/rpc behavior
+      if (token != null) {
+        LOG.debug("Using UGI token: " + token);
+        canRefreshDelegationToken = false; 
+      } else {
+        token = getDelegationToken(null);
+        if (token != null) {
+          LOG.debug("Fetched new token: " + token);
+        } else { // security is disabled
+          canRefreshDelegationToken = false;
+        }
+      }
+      setDelegationToken(token);
+    }
     return delegationToken;
   }
 
+  @VisibleForTesting
+  synchronized boolean replaceExpiredDelegationToken() throws IOException {
+    boolean replaced = false;
+    if (canRefreshDelegationToken) {
+      Token<?> token = getDelegationToken(null);
+      LOG.debug("Replaced expired token: " + token);
+      setDelegationToken(token);
+      replaced = (token != null);
+    }
+    return replaced;
+  }
+
   @Override
   protected int getDefaultPort() {
     return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
@@ -288,8 +318,8 @@ public class WebHdfsFileSystem extends F
     final int code = conn.getResponseCode();
     // server is demanding an authentication we don't support
     if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
-      throw new IOException(
-          new AuthenticationException(conn.getResponseMessage()));
+      // match hdfs/rpc exception
+      throw new AccessControlException(conn.getResponseMessage());
     }
     if (code != op.getExpectedHttpResponseCode()) {
       final Map<?, ?> m;
@@ -309,7 +339,15 @@ public class WebHdfsFileSystem extends F
         return m;
       }
 
-      final RemoteException re = JsonUtil.toRemoteException(m);
+      IOException re = JsonUtil.toRemoteException(m);
+      // extract UGI-related exceptions and unwrap InvalidToken
+      // the NN mangles these exceptions but the DN does not and may need
+      // to re-fetch a token if either report the token is expired
+      if (re.getMessage().startsWith("Failed to obtain user group 
information:")) {
+        String[] parts = re.getMessage().split(":\\s+", 3);
+        re = new RemoteException(parts[1], parts[2]);
+        re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
+      }
       throw unwrapException? toIOException(re): re;
     }
     return null;
@@ -369,7 +407,7 @@ public class WebHdfsFileSystem extends F
     // Skip adding delegation token for token operations because these
     // operations require authentication.
     Token<?> token = null;
-    if (UserGroupInformation.isSecurityEnabled() && !op.getRequireAuth()) {
+    if (!op.getRequireAuth()) {
       token = getDelegationToken();
     }
     if (token != null) {
@@ -540,11 +578,17 @@ public class WebHdfsFileSystem extends F
             validateResponse(op, conn, false);
           }
           return getResponse(conn);
-        } catch (IOException ioe) {
-          Throwable cause = ioe.getCause();
-          if (cause != null && cause instanceof AuthenticationException) {
-            throw ioe; // no retries for auth failures
+        } catch (AccessControlException ace) {
+          // no retries for auth failures
+          throw ace;
+        } catch (InvalidToken it) {
+          // try to replace the expired token with a new one.  the attempt
+          // to acquire a new token must be outside this operation's retry
+          // so if it fails after its own retries, this operation fails too.
+          if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
+            throw it;
           }
+        } catch (IOException ioe) {
           shouldRetry(ioe, retry);
         }
       }
@@ -712,6 +756,17 @@ public class WebHdfsFileSystem extends F
       };
     }
   }
+
+  class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> 
{
+    FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
+      super(op, fspath, parameters);
+    }
+    @Override
+    HttpURLConnection getResponse(final HttpURLConnection conn)
+        throws IOException {
+      return conn;
+    }
+  }
   
   /**
    * Used by open() which tracks the resolved url itself
@@ -1077,16 +1132,41 @@ public class WebHdfsFileSystem extends F
       ) throws IOException {
     statistics.incrementReadOps(1);
     final HttpOpParam.Op op = GetOpParam.Op.OPEN;
-    final URL url = toUrl(op, f, new BufferSizeParam(buffersize));
+    // use a runner so the open can recover from an invalid token
+    FsPathConnectionRunner runner =
+        new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
     return new FSDataInputStream(new OffsetUrlInputStream(
-        new OffsetUrlOpener(url), new OffsetUrlOpener(null)));
+        new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
   }
 
   @Override
-  public void close() throws IOException {
-    super.close();
-    synchronized (this) {
-      tokenAspect.removeRenewAction();
+  public synchronized void close() throws IOException {
+    try {
+      if (canRefreshDelegationToken && delegationToken != null) {
+        cancelDelegationToken(delegationToken);
+      }
+    } catch (IOException ioe) {
+      LOG.debug("Token cancel failed: "+ioe);
+    } finally {
+      super.close();
+    }
+  }
+
+  // use FsPathConnectionRunner to ensure retries for InvalidTokens
+  class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
+    private final FsPathConnectionRunner runner;
+    UnresolvedUrlOpener(FsPathConnectionRunner runner) {
+      super(null);
+      this.runner = runner;
+    }
+
+    @Override
+    protected HttpURLConnection connect(long offset, boolean resolved)
+        throws IOException {
+      assert offset == 0;
+      HttpURLConnection conn = runner.run();
+      setURL(conn.getURL());
+      return conn;
     }
   }
 
@@ -1139,7 +1219,7 @@ public class WebHdfsFileSystem extends F
   }
 
   static class OffsetUrlInputStream extends ByteRangeInputStream {
-    OffsetUrlInputStream(OffsetUrlOpener o, OffsetUrlOpener r)
+    OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
         throws IOException {
       super(o, r);
     }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier?rev=1604300&r1=1604299&r2=1604300&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
 Fri Jun 20 23:58:23 2014
@@ -13,3 +13,5 @@
 #
 org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier
 org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
+org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier$WebHdfsDelegationTokenIdentifier
+org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier$SWebHdfsDelegationTokenIdentifier

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java?rev=1604300&r1=1604299&r2=1604300&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsTokens.java
 Fri Jun 20 23:58:23 2014
@@ -19,47 +19,63 @@
 package org.apache.hadoop.hdfs.web;
 
 import static 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.SIMPLE;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.reset;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
 
+import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.net.URI;
+import java.security.PrivilegedExceptionAction;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import 
org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
 import org.apache.hadoop.hdfs.web.resources.GetOpParam;
 import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
 import org.apache.hadoop.hdfs.web.resources.PostOpParam;
 import org.apache.hadoop.hdfs.web.resources.PutOpParam;
+import org.apache.hadoop.http.HttpConfig;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
+import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
 
 public class TestWebHdfsTokens {
   private static Configuration conf;
+  URI uri = null;
 
   @BeforeClass
   public static void setUp() {
     conf = new Configuration();
     SecurityUtil.setAuthenticationMethod(KERBEROS, conf);
     UserGroupInformation.setConfiguration(conf);    
+    UserGroupInformation.setLoginUser(
+        UserGroupInformation.createUserForTesting(
+            "LoginUser", new String[]{"supergroup"}));
   }
 
   private WebHdfsFileSystem spyWebhdfsInSecureSetup() throws IOException {
     WebHdfsFileSystem fsOrig = new WebHdfsFileSystem();
     fsOrig.initialize(URI.create("webhdfs://127.0.0.1:0"), conf);
     WebHdfsFileSystem fs = spy(fsOrig);
-    Whitebox.setInternalState(fsOrig.tokenAspect, "fs", fs);
     return fs;
   }
 
@@ -89,7 +105,7 @@ public class TestWebHdfsTokens {
   }
 
   @Test(timeout = 5000)
-  public void testNoTokenForCanclToken() throws IOException {
+  public void testNoTokenForRenewToken() throws IOException {
     checkNoTokenForOperation(PutOpParam.Op.RENEWDELEGATIONTOKEN);
   }
 
@@ -139,4 +155,277 @@ public class TestWebHdfsTokens {
       assertFalse(op.getRequireAuth());
     }
   }
+  
+  @SuppressWarnings("unchecked") // for any(Token.class)
+  @Test
+  public void testLazyTokenFetchForWebhdfs() throws Exception {
+    MiniDFSCluster cluster = null;
+    WebHdfsFileSystem fs = null;
+    try {
+      final Configuration clusterConf = new HdfsConfiguration(conf);
+      SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
+      clusterConf.setBoolean(DFSConfigKeys
+          .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+
+      // trick the NN into thinking security is enabled w/o it trying
+      // to login from a keytab
+      UserGroupInformation.setConfiguration(clusterConf);
+      cluster = new 
MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
+      cluster.waitActive();
+      SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
+      UserGroupInformation.setConfiguration(clusterConf);
+      
+      uri = DFSUtil.createUri(
+          "webhdfs", cluster.getNameNode().getHttpAddress());
+      validateLazyTokenFetch(clusterConf);
+    } finally {
+      IOUtils.cleanup(null, fs);
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked") // for any(Token.class)
+  @Test
+  public void testLazyTokenFetchForSWebhdfs() throws Exception {
+    MiniDFSCluster cluster = null;
+    SWebHdfsFileSystem fs = null;
+    try {
+      final Configuration clusterConf = new HdfsConfiguration(conf);
+      SecurityUtil.setAuthenticationMethod(SIMPLE, clusterConf);
+      clusterConf.setBoolean(DFSConfigKeys
+           .DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY, true);
+      String BASEDIR = System.getProperty("test.build.dir",
+                 "target/test-dir") + "/" + 
TestWebHdfsTokens.class.getSimpleName();
+      String keystoresDir;
+      String sslConfDir;
+           
+      clusterConf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
+      clusterConf.set(DFSConfigKeys.DFS_HTTP_POLICY_KEY, 
HttpConfig.Policy.HTTPS_ONLY.name());
+      clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, 
"localhost:0");
+      clusterConf.set(DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, 
"localhost:0");
+         
+      File base = new File(BASEDIR);
+      FileUtil.fullyDelete(base);
+      base.mkdirs();
+      keystoresDir = new File(BASEDIR).getAbsolutePath();
+      sslConfDir = KeyStoreTestUtil.getClasspathDir(TestWebHdfsTokens.class);
+      KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfDir, clusterConf, 
false);
+         
+      // trick the NN into thinking security is enabled w/o it trying
+      // to login from a keytab
+      UserGroupInformation.setConfiguration(clusterConf);
+      cluster = new 
MiniDFSCluster.Builder(clusterConf).numDataNodes(1).build();
+      cluster.waitActive();
+      InetSocketAddress addr = cluster.getNameNode().getHttpsAddress();
+      String nnAddr = NetUtils.getHostPortString(addr);
+      clusterConf.set(DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY, nnAddr);
+      SecurityUtil.setAuthenticationMethod(KERBEROS, clusterConf);
+      UserGroupInformation.setConfiguration(clusterConf);
+      
+      uri = DFSUtil.createUri(
+        "swebhdfs", cluster.getNameNode().getHttpsAddress());
+      validateLazyTokenFetch(clusterConf);
+      } finally {
+        IOUtils.cleanup(null, fs);
+        if (cluster != null) {
+          cluster.shutdown();
+        }
+     }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private void validateLazyTokenFetch(final Configuration clusterConf) throws 
Exception{
+    final String testUser = "DummyUser";
+    UserGroupInformation ugi = UserGroupInformation.createUserForTesting(
+      testUser, new String[]{"supergroup"});
+    WebHdfsFileSystem fs = ugi.doAs(new 
PrivilegedExceptionAction<WebHdfsFileSystem>() {
+    @Override
+      public WebHdfsFileSystem run() throws IOException {
+        return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, 
clusterConf));
+         }
+    });
+    // verify token ops don't get a token
+    Assert.assertNull(fs.getRenewToken());
+    Token<?> token = fs.getDelegationToken(null);
+    fs.renewDelegationToken(token);
+    fs.cancelDelegationToken(token);
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    Assert.assertNull(fs.getRenewToken());
+    reset(fs);
+
+    // verify first non-token op gets a token
+    final Path p = new Path("/f");
+    fs.create(p, (short)1).close();
+    verify(fs, times(1)).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, times(1)).getDelegationToken(anyString());
+    verify(fs, times(1)).setDelegationToken(any(Token.class));
+    token = fs.getRenewToken();
+    Assert.assertNotNull(token);      
+    Assert.assertEquals(testUser, getTokenOwner(token));
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    reset(fs);
+
+    // verify prior token is reused
+    fs.getFileStatus(p);
+    verify(fs, times(1)).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    Token<?> token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertSame(token, token2);
+    reset(fs);
+
+    // verify renew of expired token fails w/o getting a new token
+    token = fs.getRenewToken();
+    fs.cancelDelegationToken(token);
+    try {
+      fs.renewDelegationToken(token);
+      Assert.fail("should have failed");
+    } catch (InvalidToken it) {
+    } catch (Exception ex) {
+      Assert.fail("wrong exception:"+ex);
+    }
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertSame(token, token2);
+    reset(fs);
+
+    // verify cancel of expired token fails w/o getting a new token
+    try {
+      fs.cancelDelegationToken(token);
+      Assert.fail("should have failed");
+    } catch (InvalidToken it) {
+    } catch (Exception ex) {
+      Assert.fail("wrong exception:"+ex);
+    }
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertSame(token, token2);
+    reset(fs);
+
+    // verify an expired token is replaced with a new token
+    fs.open(p).close();
+    verify(fs, times(2)).getDelegationToken(); // first bad, then good
+    verify(fs, times(1)).replaceExpiredDelegationToken();
+    verify(fs, times(1)).getDelegationToken(null);
+    verify(fs, times(1)).setDelegationToken(any(Token.class));
+    token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertNotSame(token, token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertEquals(testUser, getTokenOwner(token2));
+    reset(fs);
+
+    // verify with open because it's a little different in how it
+    // opens connections
+    fs.cancelDelegationToken(fs.getRenewToken());
+    InputStream is = fs.open(p);
+    is.read();
+    is.close();
+    verify(fs, times(2)).getDelegationToken(); // first bad, then good
+    verify(fs, times(1)).replaceExpiredDelegationToken();
+    verify(fs, times(1)).getDelegationToken(null);
+    verify(fs, times(1)).setDelegationToken(any(Token.class));
+    token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertNotSame(token, token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertEquals(testUser, getTokenOwner(token2));
+    reset(fs);
+
+    // verify fs close cancels the token
+    fs.close();
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, times(1)).cancelDelegationToken(eq(token2));
+
+    // add a token to ugi for a new fs, verify it uses that token
+    token = fs.getDelegationToken(null);
+    ugi.addToken(token);
+    fs = ugi.doAs(new PrivilegedExceptionAction<WebHdfsFileSystem>() {
+      @Override
+      public WebHdfsFileSystem run() throws IOException {
+        return spy((WebHdfsFileSystem) FileSystem.newInstance(uri, 
clusterConf));
+      }
+    });
+    Assert.assertNull(fs.getRenewToken());
+    fs.getFileStatus(new Path("/"));
+    verify(fs, times(1)).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, times(1)).setDelegationToken(eq(token));
+    token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertSame(token, token2);
+    reset(fs);
+
+    // verify it reuses the prior ugi token
+    fs.getFileStatus(new Path("/"));
+    verify(fs, times(1)).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    token2 = fs.getRenewToken();
+    Assert.assertNotNull(token2);
+    Assert.assertEquals(fs.getTokenKind(), token.getKind());
+    Assert.assertSame(token, token2);
+    reset(fs);
+
+    // verify an expired ugi token is NOT replaced with a new token
+    fs.cancelDelegationToken(token);
+    for (int i=0; i<2; i++) {
+      try {
+        fs.getFileStatus(new Path("/"));
+        Assert.fail("didn't fail");
+      } catch (InvalidToken it) {
+      } catch (Exception ex) {
+        Assert.fail("wrong exception:"+ex);
+      }
+      verify(fs, times(1)).getDelegationToken();
+      verify(fs, times(1)).replaceExpiredDelegationToken();
+      verify(fs, never()).getDelegationToken(anyString());
+      verify(fs, never()).setDelegationToken(any(Token.class));
+      token2 = fs.getRenewToken();
+      Assert.assertNotNull(token2);
+      Assert.assertEquals(fs.getTokenKind(), token.getKind());
+      Assert.assertSame(token, token2);
+      reset(fs);
+    }
+    
+    // verify fs close does NOT cancel the ugi token
+    fs.close();
+    verify(fs, never()).getDelegationToken();
+    verify(fs, never()).replaceExpiredDelegationToken();
+    verify(fs, never()).getDelegationToken(anyString());
+    verify(fs, never()).setDelegationToken(any(Token.class));
+    verify(fs, never()).cancelDelegationToken(any(Token.class));
+  } 
+  
+  private String getTokenOwner(Token<?> token) throws IOException {
+    // webhdfs doesn't register properly with the class loader
+    @SuppressWarnings({ "rawtypes", "unchecked" })
+    Token<?> clone = new Token(token);
+    clone.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    return clone.decodeIdentifier().getUser().getUserName();
+  }
 }


Reply via email to