HADOOP-14445. Delegation tokens are not shared between KMS instances. 
Contributed by Xiao Chen and Rushabh S Shah.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/583fa6ed
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/583fa6ed
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/583fa6ed

Branch: refs/heads/HDFS-7240
Commit: 583fa6ed48ad3df40bcaa9c591d5ccd07ce3ea81
Parents: e813975
Author: Xiao Chen <x...@apache.org>
Authored: Tue Apr 10 15:26:33 2018 -0700
Committer: Xiao Chen <x...@apache.org>
Committed: Tue Apr 10 15:38:25 2018 -0700

----------------------------------------------------------------------
 .../crypto/key/kms/KMSClientProvider.java       | 212 ++++----
 .../crypto/key/kms/KMSDelegationToken.java      |  22 +-
 .../crypto/key/kms/KMSLegacyTokenRenewer.java   |  56 ++
 .../hadoop/crypto/key/kms/KMSTokenRenewer.java  | 103 ++++
 .../hadoop/crypto/key/kms/package-info.java     |  18 +
 .../fs/CommonConfigurationKeysPublic.java       |  10 +
 .../web/DelegationTokenAuthenticatedURL.java    |  21 +-
 .../DelegationTokenAuthenticationHandler.java   |   8 +-
 .../web/DelegationTokenAuthenticator.java       |   2 +-
 .../java/org/apache/hadoop/util/KMSUtil.java    |  45 +-
 .../hadoop/util/KMSUtilFaultInjector.java       |  49 ++
 ...apache.hadoop.security.token.TokenIdentifier |   1 +
 ...rg.apache.hadoop.security.token.TokenRenewer |   3 +-
 .../src/main/resources/core-default.xml         |  20 +
 .../crypto/key/kms/TestKMSClientProvider.java   | 162 ++++++
 .../kms/TestLoadBalancingKMSClientProvider.java |  67 ++-
 .../org/apache/hadoop/util/TestKMSUtil.java     |  65 +++
 .../hadoop/crypto/key/kms/server/TestKMS.java   | 519 ++++++++++++++++---
 18 files changed, 1180 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
index 2eb2e21..f97fde7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java
@@ -36,8 +36,9 @@ import 
org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.TokenSelector;
 import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
+import 
org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
 import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
 import org.apache.hadoop.util.HttpExceptionUtils;
 import org.apache.hadoop.util.KMSUtil;
@@ -82,6 +83,8 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_KEY;
+import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT;
 import static org.apache.hadoop.util.KMSUtil.checkNotEmpty;
 import static org.apache.hadoop.util.KMSUtil.checkNotNull;
 import static org.apache.hadoop.util.KMSUtil.parseJSONEncKeyVersion;
@@ -96,16 +99,13 @@ import static 
org.apache.hadoop.util.KMSUtil.parseJSONMetadata;
 public class KMSClientProvider extends KeyProvider implements CryptoExtension,
     KeyProviderDelegationTokenExtension.DelegationTokenExtension {
 
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(KMSClientProvider.class);
 
   private static final String INVALID_SIGNATURE = "Invalid signature";
 
   private static final String ANONYMOUS_REQUESTS_DISALLOWED = "Anonymous 
requests are disallowed";
 
-  public static final String TOKEN_KIND_STR = 
KMSDelegationToken.TOKEN_KIND_STR;
-  public static final Text TOKEN_KIND = KMSDelegationToken.TOKEN_KIND;
-
   public static final String SCHEME_NAME = "kms";
 
   private static final String UTF8 = "UTF-8";
@@ -133,12 +133,17 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
   private static final ObjectWriter WRITER =
       new ObjectMapper().writerWithDefaultPrettyPrinter();
 
+  /* dtService defines the token service value for the kms token.
+   * The value can be legacy format which is ip:port format or it can be uri.
+   * If it's uri format, then the value is read from
+   * CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH at key
+   * provider creation time, and set to token's Service field.
+   * When a token is renewed / canceled, its Service field will be used to
+   * instantiate a KeyProvider, eliminating the need to read configs
+    * at that time.
+   */
   private final Text dtService;
-
-  // Allow fallback to default kms server port 9600 for certain tests that do
-  // not specify the port explicitly in the kms provider url.
-  @VisibleForTesting
-  public static volatile boolean fallbackDefaultPortForTesting = false;
+  private final boolean copyLegacyToken;
 
   private class EncryptedQueueRefiller implements
     ValueQueue.QueueRefiller<EncryptedKeyVersion> {
@@ -162,66 +167,6 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
     }
   }
 
-  /**
-   * The KMS implementation of {@link TokenRenewer}.
-   */
-  public static class KMSTokenRenewer extends TokenRenewer {
-    private static final Logger LOG =
-        LoggerFactory.getLogger(KMSTokenRenewer.class);
-
-    @Override
-    public boolean handleKind(Text kind) {
-      return kind.equals(TOKEN_KIND);
-    }
-
-    @Override
-    public boolean isManaged(Token<?> token) throws IOException {
-      return true;
-    }
-
-    @Override
-    public long renew(Token<?> token, Configuration conf) throws IOException {
-      LOG.debug("Renewing delegation token {}", token);
-      KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
-          KeyProviderFactory.KEY_PROVIDER_PATH);
-      try {
-        if (!(keyProvider instanceof
-            KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
-          LOG.warn("keyProvider {} cannot renew dt.", keyProvider == null ?
-              "null" : keyProvider.getClass());
-          return 0;
-        }
-        return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
-            keyProvider).renewDelegationToken(token);
-      } finally {
-        if (keyProvider != null) {
-          keyProvider.close();
-        }
-      }
-    }
-
-    @Override
-    public void cancel(Token<?> token, Configuration conf) throws IOException {
-      LOG.debug("Canceling delegation token {}", token);
-      KeyProvider keyProvider = KMSUtil.createKeyProvider(conf,
-          KeyProviderFactory.KEY_PROVIDER_PATH);
-      try {
-        if (!(keyProvider instanceof
-            KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
-          LOG.warn("keyProvider {} cannot cancel dt.", keyProvider == null ?
-              "null" : keyProvider.getClass());
-          return;
-        }
-        ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
-            keyProvider).cancelDelegationToken(token);
-      } finally {
-        if (keyProvider != null) {
-          keyProvider.close();
-        }
-      }
-    }
-  }
-
   public static class KMSEncryptedKeyVersion extends EncryptedKeyVersion {
     public KMSEncryptedKeyVersion(String keyName, String keyVersionName,
         byte[] iv, String encryptedVersionName, byte[] keyMaterial) {
@@ -281,13 +226,13 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
           }
           hostsPart = t[0];
         }
-        return createProvider(conf, origUrl, port, hostsPart);
+        return createProvider(conf, origUrl, port, hostsPart, providerUri);
       }
       return null;
     }
 
-    private KeyProvider createProvider(Configuration conf,
-        URL origUrl, int port, String hostsPart) throws IOException {
+    private KeyProvider createProvider(Configuration conf, URL origUrl,
+        int port, String hostsPart, URI providerUri) throws IOException {
       String[] hosts = hostsPart.split(";");
       KMSClientProvider[] providers = new KMSClientProvider[hosts.length];
       for (int i = 0; i < hosts.length; i++) {
@@ -295,7 +240,7 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
           providers[i] =
               new KMSClientProvider(
                   new URI("kms", origUrl.getProtocol(), hosts[i], port,
-                      origUrl.getPath(), null, null), conf);
+                      origUrl.getPath(), null, null), conf, providerUri);
         } catch (URISyntaxException e) {
           throw new IOException("Could not instantiate KMSProvider.", e);
         }
@@ -353,17 +298,10 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
     }
   }
 
-  public KMSClientProvider(URI uri, Configuration conf) throws IOException {
+  public KMSClientProvider(URI uri, Configuration conf, URI providerUri) throws
+      IOException {
     super(conf);
     kmsUrl = createServiceURL(extractKMSPath(uri));
-    int kmsPort = kmsUrl.getPort();
-    if ((kmsPort == -1) && fallbackDefaultPortForTesting) {
-      kmsPort = 9600;
-    }
-
-    InetSocketAddress addr = new InetSocketAddress(kmsUrl.getHost(), kmsPort);
-    dtService = SecurityUtil.buildTokenService(addr);
-
     if ("https".equalsIgnoreCase(kmsUrl.getProtocol())) {
       sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
       try {
@@ -376,6 +314,9 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
             CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_SECONDS,
             CommonConfigurationKeysPublic.KMS_CLIENT_TIMEOUT_DEFAULT);
     authRetry = conf.getInt(AUTH_RETRY, DEFAULT_AUTH_RETRY);
+    copyLegacyToken = conf.getBoolean(KMS_CLIENT_COPY_LEGACY_TOKEN_KEY,
+        KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT);
+
     configurator = new TimeoutConnConfigurator(timeout, sslFactory);
     encKeyVersionQueue =
         new ValueQueue<KeyProviderCryptoExtension.EncryptedKeyVersion>(
@@ -400,6 +341,7 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
                     KMS_CLIENT_ENC_KEY_CACHE_NUM_REFILL_THREADS_DEFAULT),
             new EncryptedQueueRefiller());
     authToken = new DelegationTokenAuthenticatedURL.Token();
+    dtService = new Text(providerUri.toString());
     LOG.info("KMSClientProvider for KMS url: {} delegation token service: {}" +
         " created.", kmsUrl, dtService);
   }
@@ -473,7 +415,7 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
         @Override
         public HttpURLConnection run() throws Exception {
           DelegationTokenAuthenticatedURL authUrl =
-              new DelegationTokenAuthenticatedURL(configurator);
+              createKMSAuthenticatedURL();
           return authUrl.openConnection(url, authToken, doAsUser);
         }
       });
@@ -924,7 +866,7 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
       LOG.debug("Renewing delegation token {} with url:{}, as:{}",
           token, url, doAsUser);
       final DelegationTokenAuthenticatedURL authUrl =
-          new DelegationTokenAuthenticatedURL(configurator);
+          createKMSAuthenticatedURL();
       return getActualUgi().doAs(
           new PrivilegedExceptionAction<Long>() {
             @Override
@@ -956,7 +898,7 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
               LOG.debug("Cancelling delegation token {} with url:{}, as:{}",
                   dToken, url, doAsUser);
               final DelegationTokenAuthenticatedURL authUrl =
-                  new DelegationTokenAuthenticatedURL(configurator);
+                  createKMSAuthenticatedURL();
               authUrl.cancelDelegationToken(url, token, doAsUser);
               return null;
             }
@@ -1008,6 +950,17 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
     return token;
   }
 
+  @VisibleForTesting
+  DelegationTokenAuthenticatedURL createKMSAuthenticatedURL() {
+    return new DelegationTokenAuthenticatedURL(configurator) {
+      @Override
+      public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+          getDelegationToken(URL url, Credentials creds) {
+        return selectKMSDelegationToken(creds);
+      }
+    };
+  }
+
   @Override
   public Token<?>[] addDelegationTokens(final String renewer,
       Credentials credentials) throws IOException {
@@ -1016,7 +969,7 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
     if (token == null) {
       final URL url = createURL(null, null, null, null);
       final DelegationTokenAuthenticatedURL authUrl =
-          new DelegationTokenAuthenticatedURL(configurator);
+          createKMSAuthenticatedURL();
       try {
         final String doAsUser = getDoAsUser();
         token = getActualUgi().doAs(new PrivilegedExceptionAction<Token<?>>() {
@@ -1030,9 +983,16 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
           }
         });
         if (token != null) {
-          LOG.debug("New token received: ({})", token);
+          if (KMSDelegationToken.TOKEN_KIND.equals(token.getKind())) {
+            // do not set service for legacy kind, for compatibility.
+            token.setService(dtService);
+          }
+          LOG.info("New token created: ({})", token);
           credentials.addToken(token.getService(), token);
-          tokens = new Token<?>[] { token };
+          Token<?> legacyToken = createAndAddLegacyToken(credentials, token);
+          tokens = legacyToken == null ?
+              new Token<?>[] {token} :
+              new Token<?>[] {token, legacyToken};
         } else {
           throw new IOException("Got NULL as delegation token");
         }
@@ -1049,13 +1009,75 @@ public class KMSClientProvider extends KeyProvider 
implements CryptoExtension,
     return tokens;
   }
 
-  private boolean containsKmsDt(UserGroupInformation ugi) throws IOException {
-    // Add existing credentials from the UGI, since provider is cached.
-    Credentials creds = ugi.getCredentials();
+  /**
+   * If {@link CommonConfigurationKeysPublic#KMS_CLIENT_COPY_LEGACY_TOKEN_KEY}
+   * is true when creating the provider, then copy the passed-in token of
+   * {@link KMSDelegationToken#TOKEN_KIND} and create a new token of
+   * {@link KMSDelegationToken#TOKEN_LEGACY_KIND}, and add it to credentials.
+   *
+   * @return The legacy token, or null if one should not be created.
+   */
+  private Token<?> createAndAddLegacyToken(Credentials credentials,
+      Token<?> token) {
+    if (!copyLegacyToken || !KMSDelegationToken.TOKEN_KIND
+        .equals(token.getKind())) {
+      LOG.debug("Not creating legacy token because copyLegacyToken={}, "
+          + "token={}", copyLegacyToken, token);
+      return null;
+    }
+    // copy a KMS_DELEGATION_TOKEN and create a new kms-dt with the same
+    // underlying token for backwards-compatibility. Old clients/renewers
+    // does not parse the new token and can only work with kms-dt.
+    final Token<?> legacyToken = token.copyToken();
+    legacyToken.setKind(KMSDelegationToken.TOKEN_LEGACY_KIND);
+    final InetSocketAddress addr =
+        new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
+    final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
+    legacyToken.setService(fallBackServiceText);
+    LOG.info("Copied token to legacy kind: {}", legacyToken);
+    credentials.addToken(legacyToken.getService(), legacyToken);
+    return legacyToken;
+  }
+
+  @VisibleForTesting
+  public Text getDelegationTokenService() {
+    return dtService;
+  }
+
+  /**
+   * Given a list of tokens, return the token that should be used for KMS
+   * authentication.
+   */
+  @VisibleForTesting
+  Token selectKMSDelegationToken(Credentials creds) {
+    // always look for TOKEN_KIND first
+    final TokenSelector<AbstractDelegationTokenIdentifier> tokenSelector =
+        new AbstractDelegationTokenSelector<AbstractDelegationTokenIdentifier>(
+            KMSDelegationToken.TOKEN_KIND) {
+        };
+    Token token = tokenSelector.selectToken(dtService, creds.getAllTokens());
+    LOG.debug("Searching service {} found token {}", dtService, token);
+    if (token != null) {
+      return token;
+    }
+
+    // fall back to look for token by service, regardless of kind.
+    // this is old behavior, keeping for compatibility reasons (for example,
+    // even if KMS server is new, if the job is submitted with an old kms
+    // client, job runners on new version should be able to find the token).
+    final InetSocketAddress addr =
+        new InetSocketAddress(kmsUrl.getHost(), kmsUrl.getPort());
+    final Text fallBackServiceText = SecurityUtil.buildTokenService(addr);
+    token = creds.getToken(fallBackServiceText);
+    LOG.debug("Selected delegation token {} using service:{}", token,
+        fallBackServiceText);
+    return token;
+  }
+
+  private boolean containsKmsDt(UserGroupInformation ugi) {
+    final Credentials creds = ugi.getCredentials();
     if (!creds.getAllTokens().isEmpty()) {
-      LOG.debug("Searching for token that matches service: {}", dtService);
-      org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
-          dToken = creds.getToken(dtService);
+      final Token dToken = selectKMSDelegationToken(creds);
       if (dToken != null) {
         return true;
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java
index adeebf2..2642e79 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSDelegationToken.java
@@ -27,7 +27,10 @@ import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier
 @InterfaceAudience.Private
 public final class KMSDelegationToken {
 
-  public static final String TOKEN_KIND_STR = "kms-dt";
+  public static final String TOKEN_LEGACY_KIND_STR = "kms-dt";
+  public static final Text TOKEN_LEGACY_KIND = new Text(TOKEN_LEGACY_KIND_STR);
+
+  public static final String TOKEN_KIND_STR = "KMS_DELEGATION_TOKEN";
   public static final Text TOKEN_KIND = new Text(TOKEN_KIND_STR);
 
   // Utility class is not supposed to be instantiated.
@@ -49,4 +52,21 @@ public final class KMSDelegationToken {
       return TOKEN_KIND;
     }
   }
+
+  /**
+   * DelegationTokenIdentifier used for the KMS for legacy tokens.
+   */
+  @Deprecated
+  public static class KMSLegacyDelegationTokenIdentifier
+      extends DelegationTokenIdentifier {
+
+    public KMSLegacyDelegationTokenIdentifier() {
+      super(TOKEN_LEGACY_KIND);
+    }
+
+    @Override
+    public Text getKind() {
+      return TOKEN_LEGACY_KIND;
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java
new file mode 100644
index 0000000..fd27073
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSLegacyTokenRenewer.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.KMSUtil;
+
+import java.io.IOException;
+
+import static 
org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
+
+/**
+ * The {@link KMSTokenRenewer} that supports legacy tokens.
+ */
+@InterfaceAudience.Private
+@Deprecated
+public class KMSLegacyTokenRenewer extends KMSTokenRenewer {
+
+  @Override
+  public boolean handleKind(Text kind) {
+    return kind.equals(TOKEN_LEGACY_KIND);
+  }
+
+  /**
+   * Create a key provider for token renewal / cancellation.
+   * Caller is responsible for closing the key provider.
+   */
+  @Override
+  protected KeyProvider createKeyProvider(Token<?> token,
+      Configuration conf) throws IOException {
+    assert token.getKind().equals(TOKEN_LEGACY_KIND);
+    // Legacy tokens get service from configuration.
+    return KMSUtil.createKeyProvider(conf,
+        CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java
new file mode 100644
index 0000000..908ad39
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSTokenRenewer.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.crypto.key.KeyProviderDelegationTokenExtension;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.KMSUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+
+/**
+ * The KMS implementation of {@link TokenRenewer}.
+ */
+@InterfaceAudience.Private
+public class KMSTokenRenewer extends TokenRenewer {
+
+  public static final Logger LOG = LoggerFactory
+      .getLogger(org.apache.hadoop.crypto.key.kms.KMSTokenRenewer.class);
+
+  @Override
+  public boolean handleKind(Text kind) {
+    return kind.equals(TOKEN_KIND);
+  }
+
+  @Override
+  public boolean isManaged(Token<?> token) throws IOException {
+    return true;
+  }
+
+  @Override
+  public long renew(Token<?> token, Configuration conf) throws IOException {
+    LOG.debug("Renewing delegation token {}", token);
+    final KeyProvider keyProvider = createKeyProvider(token, conf);
+    try {
+      if (!(keyProvider instanceof
+          KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
+        LOG.warn("keyProvider {} cannot renew token {}.",
+            keyProvider == null ? "null" : keyProvider.getClass(), token);
+        return 0;
+      }
+      return ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
+          keyProvider).renewDelegationToken(token);
+    } finally {
+      if (keyProvider != null) {
+        keyProvider.close();
+      }
+    }
+  }
+
+  @Override
+  public void cancel(Token<?> token, Configuration conf) throws IOException {
+    LOG.debug("Canceling delegation token {}", token);
+    final KeyProvider keyProvider = createKeyProvider(token, conf);
+    try {
+      if (!(keyProvider instanceof
+          KeyProviderDelegationTokenExtension.DelegationTokenExtension)) {
+        LOG.warn("keyProvider {} cannot cancel token {}.",
+            keyProvider == null ? "null" : keyProvider.getClass(), token);
+        return;
+      }
+      ((KeyProviderDelegationTokenExtension.DelegationTokenExtension)
+          keyProvider).cancelDelegationToken(token);
+    } finally {
+      if (keyProvider != null) {
+        keyProvider.close();
+      }
+    }
+  }
+
+  /**
+   * Create a key provider for token renewal / cancellation.
+   * Caller is responsible for closing the key provider.
+   */
+  protected KeyProvider createKeyProvider(Token<?> token,
+      Configuration conf) throws IOException {
+    return KMSUtil
+        .createKeyProviderFromTokenService(conf, 
token.getService().toString());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java
new file mode 100644
index 0000000..eea93c2
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
index 8cd753a..be1c7bc 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java
@@ -775,6 +775,16 @@ public class CommonConfigurationKeysPublic {
    * <a 
href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
    * core-default.xml</a>
    */
+  public static final String KMS_CLIENT_COPY_LEGACY_TOKEN_KEY =
+      "hadoop.security.kms.client.copy.legacy.token";
+  /**  Default value is true. */
+  public static final boolean KMS_CLIENT_COPY_LEGACY_TOKEN_DEFAULT = true;
+
+  /**
+   * @see
+   * <a 
href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
+   * core-default.xml</a>
+   */
   public static final String KMS_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
       "hadoop.security.kms.client.failover.sleep.max.millis";
   /** Default value is 2 secs. */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
index 0b1fdf8..0ddc4fc 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticatedURL.java
@@ -300,11 +300,7 @@ public class DelegationTokenAuthenticatedURL extends 
AuthenticatedURL {
             creds.getAllTokens());
       }
       if (!creds.getAllTokens().isEmpty()) {
-        InetSocketAddress serviceAddr = new InetSocketAddress(url.getHost(),
-            url.getPort());
-        Text service = SecurityUtil.buildTokenService(serviceAddr);
-        dToken = creds.getToken(service);
-        LOG.debug("Using delegation token {} from service:{}", dToken, 
service);
+        dToken = getDelegationToken(url, creds);
         if (dToken != null) {
           if (useQueryStringForDelegationToken()) {
             // delegation token will go in the query string, injecting it
@@ -341,6 +337,21 @@ public class DelegationTokenAuthenticatedURL extends 
AuthenticatedURL {
   }
 
   /**
+   * Select a delegation token from all tokens in credentials, based on url.
+   */
+  @InterfaceAudience.Private
+  public org.apache.hadoop.security.token.Token<? extends TokenIdentifier>
+      getDelegationToken(URL url, Credentials creds) {
+    final InetSocketAddress serviceAddr =
+        new InetSocketAddress(url.getHost(), url.getPort());
+    final Text service = SecurityUtil.buildTokenService(serviceAddr);
+    org.apache.hadoop.security.token.Token<? extends TokenIdentifier> dToken =
+        creds.getToken(service);
+    LOG.debug("Selected delegation token {} using service:{}", dToken, 
service);
+    return dToken;
+  }
+
+  /**
    * Requests a delegation token using the configured 
<code>Authenticator</code>
    * for authentication.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
index 6ee59f1..0ef102e7 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticationHandler.java
@@ -81,7 +81,7 @@ import com.google.common.annotations.VisibleForTesting;
 @InterfaceStability.Evolving
 public abstract class DelegationTokenAuthenticationHandler
     implements AuthenticationHandler {
-  private static final Logger LOG =
+  public static final Logger LOG =
       LoggerFactory.getLogger(DelegationTokenAuthenticationHandler.class);
 
   protected static final String TYPE_POSTFIX = "-dt";
@@ -224,7 +224,8 @@ public abstract class DelegationTokenAuthenticationHandler
       HttpServletRequest request, HttpServletResponse response)
       throws IOException, AuthenticationException {
     boolean requestContinues = true;
-    LOG.trace("Processing operation for req=({}), token: {}", request, token);
+    LOG.trace("Processing operation for req=({}), token: {}",
+        request.getRequestURL(), token);
     String op = ServletUtils.getParameter(request,
         KerberosDelegationTokenAuthenticator.OP_PARAM);
     op = (op != null) ? StringUtils.toUpperCase(op) : null;
@@ -407,7 +408,8 @@ public abstract class DelegationTokenAuthenticationHandler
             HttpServletResponse.SC_FORBIDDEN, new AuthenticationException(ex));
       }
     } else {
-      LOG.debug("Falling back to {} (req={})", authHandler.getClass(), 
request);
+      LOG.debug("Falling back to {} (req={})", authHandler.getClass(),
+          request.getRequestURL());
       token = authHandler.authenticate(request, response);
     }
     return token;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java
index 617773b..7e83781 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/token/delegation/web/DelegationTokenAuthenticator.java
@@ -50,7 +50,7 @@ import java.util.Map;
 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public abstract class DelegationTokenAuthenticator implements Authenticator {
-  private static Logger LOG = 
+  public static final Logger LOG =
       LoggerFactory.getLogger(DelegationTokenAuthenticator.class);
   
   private static final String CONTENT_TYPE = "Content-Type";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
index c96c6fb..8077076 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtil.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.HashMap;
@@ -41,8 +42,7 @@ import java.util.Map;
  */
 @InterfaceAudience.Private
 public final class KMSUtil {
-  public static final Logger LOG =
-      LoggerFactory.getLogger(KMSUtil.class);
+  public static final Logger LOG = LoggerFactory.getLogger(KMSUtil.class);
 
   private KMSUtil() { /* Hidden constructor */ }
 
@@ -64,6 +64,13 @@ public final class KMSUtil {
     if (providerUriStr == null || providerUriStr.isEmpty()) {
       return null;
     }
+    KeyProvider kp = KMSUtilFaultInjector.get().createKeyProviderForTests(
+        providerUriStr, conf);
+    if (kp != null) {
+      LOG.info("KeyProvider is created with uri: {}. This should happen only " 
+
+              "in tests.", providerUriStr);
+      return kp;
+    }
     return createKeyProviderFromUri(conf, URI.create(providerUriStr));
   }
 
@@ -205,4 +212,38 @@ public final class KMSUtil {
     }
     return metadata;
   }
+
+  /**
+   * Creates a key provider from token service field, which must be URI format.
+   *
+   * @param conf
+   * @param tokenServiceValue
+   * @return new KeyProvider or null
+   * @throws IOException
+   */
+  public static KeyProvider createKeyProviderFromTokenService(
+      final Configuration conf, final String tokenServiceValue)
+      throws IOException {
+    LOG.debug("Creating key provider from token service value {}. ",
+        tokenServiceValue);
+    final KeyProvider kp = KMSUtilFaultInjector.get()
+        .createKeyProviderForTests(tokenServiceValue, conf);
+    if (kp != null) {
+      LOG.info("KeyProvider is created with uri: {}. This should happen only "
+          + "in tests.", tokenServiceValue);
+      return kp;
+    }
+    if (!tokenServiceValue.contains("://")) {
+      throw new IllegalArgumentException(
+          "Invalid token service " + tokenServiceValue);
+    }
+    final URI tokenServiceUri;
+    try {
+      tokenServiceUri = new URI(tokenServiceValue);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(
+          "Invalid token service " + tokenServiceValue, e);
+    }
+    return createKeyProviderFromUri(conf, tokenServiceUri);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java
new file mode 100644
index 0000000..46d5069
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/KMSUtilFaultInjector.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+
+import java.io.IOException;
+
+/**
+ * Used for returning custom KeyProvider from test methods.
+ */
+@VisibleForTesting
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class KMSUtilFaultInjector {
+  private static KMSUtilFaultInjector instance = new KMSUtilFaultInjector();
+
+  public static KMSUtilFaultInjector get() {
+    return instance;
+  }
+
+  public static void set(KMSUtilFaultInjector injector) {
+    instance = injector;
+  }
+
+  public KeyProvider createKeyProviderForTests(String value, Configuration 
conf)
+      throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
index b65f151..43d06e2 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
+++ 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
@@ -12,3 +12,4 @@
 #   limitations under the License.
 #
 
org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSDelegationTokenIdentifier
+org.apache.hadoop.crypto.key.kms.KMSDelegationToken$KMSLegacyDelegationTokenIdentifier

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
index 56320fb..5b6082c 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
+++ 
b/hadoop-common-project/hadoop-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
@@ -11,4 +11,5 @@
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 #
-org.apache.hadoop.crypto.key.kms.KMSClientProvider$KMSTokenRenewer
\ No newline at end of file
+org.apache.hadoop.crypto.key.kms.KMSTokenRenewer
+org.apache.hadoop.crypto.key.kms.KMSLegacyTokenRenewer
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml 
b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
index ad24f56..f32268b 100644
--- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
+++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml
@@ -2603,6 +2603,26 @@
 </property>
 
 <property>
+  <name>hadoop.security.kms.client.copy.legacy.token</name>
+  <value>true</value>
+  <description>
+    Expert only. Whether the KMS client provider should copy a token to legacy
+    kind. This is for KMS_DELEGATION_TOKEN to be backwards compatible. With the
+    default value set to true, the client will locally duplicate the
+    KMS_DELEGATION_TOKEN token and create a kms-dt token, with the service 
field
+    conforming to kms-dt. All other parts of the token remain the same.
+    Then the new clients will use KMS_DELEGATION_TOKEN and old clients will
+    use kms-dt to authenticate. Default value is true.
+    You should only change this to false if you know all the KMS servers
+    , clients (including both job submitters and job runners) and the
+    token renewers (usually Yarn RM) are on a version that supports
+    KMS_DELEGATION_TOKEN.
+    Turning this off prematurely may result in old clients failing to
+    authenticate with new servers.
+  </description>
+</property>
+
+<property>
   <name>hadoop.security.kms.client.failover.sleep.max.millis</name>
   <value>2000</value>
   <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
new file mode 100644
index 0000000..56aace5
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestKMSClientProvider.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.crypto.key.kms;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.token.Token;
+import 
org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.event.Level;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.net.URI;
+import java.net.URL;
+
+import static org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_KIND;
+import static 
org.apache.hadoop.crypto.key.kms.KMSDelegationToken.TOKEN_LEGACY_KIND;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+/**
+ * Unit test for {@link KMSClientProvider} class.
+ */
+public class TestKMSClientProvider {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TestKMSClientProvider.class);
+
+  private final Token token = new Token();
+  private final Token legacyToken = new Token();
+  private final String uriString = "kms://https@host:16000/kms";
+  private final String legacyTokenService = "host:16000";
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(30000);
+
+  {
+    GenericTestUtils.setLogLevel(KMSClientProvider.LOG, Level.TRACE);
+  }
+
+  @Before
+  public void setup() {
+    SecurityUtil.setTokenServiceUseIp(false);
+    token.setKind(TOKEN_KIND);
+    token.setService(new Text(uriString));
+    legacyToken.setKind(TOKEN_LEGACY_KIND);
+    legacyToken.setService(new Text(legacyTokenService));
+  }
+
+  @Test
+  public void testNotCopyFromLegacyToken() throws Exception {
+    final DelegationTokenAuthenticatedURL url =
+        mock(DelegationTokenAuthenticatedURL.class);
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      final KMSClientProvider spyKp = spy(kp);
+      when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
+      when(url.getDelegationToken(any(URL.class),
+          any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
+          any(String.class))).thenReturn(legacyToken);
+
+      final Credentials creds = new Credentials();
+      final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
+      LOG.info("Got tokens: {}", tokens);
+      assertEquals(1, tokens.length);
+      LOG.info("uri:" + uriString);
+      // if KMS server returned a legacy token, new client should leave the
+      // service being legacy and not set uri string
+      assertEquals(legacyTokenService, tokens[0].getService().toString());
+    } finally {
+      kp.close();
+    }
+  }
+
+  @Test
+  public void testCopyFromToken() throws Exception {
+    final DelegationTokenAuthenticatedURL url =
+        mock(DelegationTokenAuthenticatedURL.class);
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      final KMSClientProvider spyKp = spy(kp);
+      when(spyKp.createKMSAuthenticatedURL()).thenReturn(url);
+      when(url.getDelegationToken(any(URL.class),
+          any(DelegationTokenAuthenticatedURL.Token.class), any(String.class),
+          any(String.class))).thenReturn(token);
+
+      final Credentials creds = new Credentials();
+      final Token<?>[] tokens = spyKp.addDelegationTokens("yarn", creds);
+      LOG.info("Got tokens: {}", tokens);
+      assertEquals(2, tokens.length);
+      assertTrue(creds.getAllTokens().contains(token));
+      assertNotNull(creds.getToken(legacyToken.getService()));
+    } finally {
+      kp.close();
+    }
+  }
+
+  @Test
+  public void testSelectTokenWhenBothExist() throws Exception {
+    final Credentials creds = new Credentials();
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      creds.addToken(token.getService(), token);
+      creds.addToken(legacyToken.getService(), legacyToken);
+      Token t = kp.selectKMSDelegationToken(creds);
+      assertEquals(token, t);
+    } finally {
+      kp.close();
+    }
+  }
+
+  @Test
+  public void testSelectTokenLegacyService() throws Exception {
+    final Configuration conf = new Configuration();
+    final URI uri = new URI(uriString);
+    final KMSClientProvider kp = new KMSClientProvider(uri, conf, uri);
+    try {
+      Text legacyService = new Text(legacyTokenService);
+      token.setService(legacyService);
+      final Credentials creds = new Credentials();
+      creds.addToken(legacyService, token);
+      Token t = kp.selectKMSDelegationToken(creds);
+      assertEquals(token, t);
+    } finally {
+      kp.close();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
index bd68dca..e6a9fe0 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/crypto/key/kms/TestLoadBalancingKMSClientProvider.java
@@ -42,7 +42,8 @@ import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.junit.After;
+import org.apache.hadoop.util.KMSUtil;
+import org.apache.hadoop.util.KMSUtilFaultInjector;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -56,33 +57,68 @@ public class TestLoadBalancingKMSClientProvider {
     SecurityUtil.setTokenServiceUseIp(false);
   }
 
-  @After
-  public void teardown() throws IOException {
-    KMSClientProvider.fallbackDefaultPortForTesting = false;
+  private void setKMSUtilFaultInjector() {
+    KMSUtilFaultInjector injector = new KMSUtilFaultInjector() {
+      @Override
+      public KeyProvider createKeyProviderForTests(
+          String value, Configuration conf) throws IOException {
+        return TestLoadBalancingKMSClientProvider
+            .createKeyProviderForTests(value, conf);
+      }
+    };
+    KMSUtilFaultInjector.set(injector);
+  }
+
+  public static KeyProvider createKeyProviderForTests(
+      String value, Configuration conf) throws IOException {
+    // The syntax for kms servers will be
+    // kms://http@localhost:port1/kms,kms://http@localhost:port2/kms
+    if (!value.contains(",")) {
+      return null;
+    }
+    String[] keyProviderUrisStr = value.split(",");
+    KMSClientProvider[] keyProviderArr =
+        new KMSClientProvider[keyProviderUrisStr.length];
+
+    int i = 0;
+    for (String keyProviderUri: keyProviderUrisStr) {
+      KMSClientProvider kmcp =
+          new KMSClientProvider(URI.create(keyProviderUri), conf, URI
+              .create(value));
+      keyProviderArr[i] = kmcp;
+      i++;
+    }
+    LoadBalancingKMSClientProvider lbkcp =
+        new LoadBalancingKMSClientProvider(keyProviderArr, conf);
+    return lbkcp;
   }
 
   @Test
   public void testCreation() throws Exception {
     Configuration conf = new Configuration();
-    KMSClientProvider.fallbackDefaultPortForTesting = true;
     KeyProvider kp = new KMSClientProvider.Factory().createProvider(new URI(
-        "kms://http@host1/kms/foo"), conf);
+        "kms://http@host1:9600/kms/foo"), conf);
     assertTrue(kp instanceof LoadBalancingKMSClientProvider);
     KMSClientProvider[] providers =
         ((LoadBalancingKMSClientProvider) kp).getProviders();
     assertEquals(1, providers.length);
-    assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/";),
+    assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/";),
         Sets.newHashSet(providers[0].getKMSUrl()));
-
-    kp = new KMSClientProvider.Factory().createProvider(new URI(
-        "kms://http@host1;host2;host3/kms/foo"), conf);
+    setKMSUtilFaultInjector();
+    String uriStr = "kms://http@host1:9600/kms/foo," +
+        "kms://http@host2:9600/kms/foo," +
+        "kms://http@host3:9600/kms/foo";
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
+        uriStr);
+    kp = KMSUtil.createKeyProvider(conf, CommonConfigurationKeysPublic
+        .HADOOP_SECURITY_KEY_PROVIDER_PATH);
     assertTrue(kp instanceof LoadBalancingKMSClientProvider);
     providers =
         ((LoadBalancingKMSClientProvider) kp).getProviders();
     assertEquals(3, providers.length);
-    assertEquals(Sets.newHashSet("http://host1/kms/foo/v1/";,
-        "http://host2/kms/foo/v1/";,
-        "http://host3/kms/foo/v1/";),
+    assertEquals(Sets.newHashSet("http://host1:9600/kms/foo/v1/";,
+        "http://host2:9600/kms/foo/v1/";,
+        "http://host3:9600/kms/foo/v1/";),
         Sets.newHashSet(providers[0].getKMSUrl(),
             providers[1].getKMSUrl(),
             providers[2].getKMSUrl()));
@@ -208,7 +244,7 @@ public class TestLoadBalancingKMSClientProvider {
 
   private class MyKMSClientProvider extends KMSClientProvider {
     public MyKMSClientProvider(URI uri, Configuration conf) throws IOException 
{
-      super(uri, conf);
+      super(uri, conf, uri);
     }
 
     @Override
@@ -245,9 +281,8 @@ public class TestLoadBalancingKMSClientProvider {
   @Test
   public void testClassCastException() throws Exception {
     Configuration conf = new Configuration();
-    KMSClientProvider.fallbackDefaultPortForTesting = true;
     KMSClientProvider p1 = new MyKMSClientProvider(
-        new URI("kms://http@host1/kms/foo"), conf);
+        new URI("kms://http@host1:9600/kms/foo"), conf);
     LoadBalancingKMSClientProvider kp = new LoadBalancingKMSClientProvider(
         new KMSClientProvider[] {p1}, 0, conf);
     try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/583fa6ed/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java
new file mode 100644
index 0000000..77f52ee
--- /dev/null
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestKMSUtil.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.key.KeyProvider;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Test {@link KMSUtil}.
+ */
+public class TestKMSUtil {
+
+  public static final Logger LOG = LoggerFactory.getLogger(TestKMSUtil.class);
+
+  @Rule
+  public Timeout globalTimeout = new Timeout(90000);
+
+  @Test
+  public void testCreateKeyProviderFromTokenService() throws Exception {
+    final Configuration conf = new Configuration();
+    KeyProvider kp = KMSUtil.createKeyProviderFromTokenService(conf,
+        "kms://https@localhost:9600/kms");
+    assertNotNull(kp);
+    kp.close();
+
+    kp = KMSUtil.createKeyProviderFromTokenService(conf,
+        "kms://https@localhost:9600/kms,kms://localhost1:9600/kms");
+    assertNotNull(kp);
+    kp.close();
+
+    String invalidService = "whatever:9600";
+    try {
+      KMSUtil.createKeyProviderFromTokenService(conf, invalidService);
+    } catch (Exception ex) {
+      LOG.info("Expected exception:", ex);
+      assertTrue(ex instanceof IllegalArgumentException);
+      GenericTestUtils.assertExceptionContains(
+          "Invalid token service " + invalidService, ex);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to