http://git-wip-us.apache.org/repos/asf/hadoop/blob/59767be1/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 904e597..c1695f1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -17,16 +17,23 @@
  */
 package org.apache.hadoop.ozone;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
 import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.slf4j.event.Level.INFO;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.InetAddress;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.security.KeyPair;
+import java.security.PrivilegedExceptionAction;
 import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -35,14 +42,29 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmInfo;
 import org.apache.hadoop.hdds.scm.server.SCMStorage;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
+import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyPEMWriter;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMStorage;
 import org.apache.hadoop.ozone.om.OzoneManager;
+import 
org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.security.KerberosAuthException;
+import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.util.KerberosUtil;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
 import org.apache.hadoop.test.LambdaTestUtils;
@@ -52,6 +74,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
+import org.mockito.Mockito;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -61,6 +84,8 @@ import org.slf4j.LoggerFactory;
 @InterfaceAudience.Private
 public final class TestSecureOzoneCluster {
 
+  private static final String TEST_USER = "testUgiUser";
+  private static final int CLIENT_TIMEOUT = 2 * 1000;
   private Logger LOGGER = LoggerFactory
       .getLogger(TestSecureOzoneCluster.class);
 
@@ -81,14 +106,24 @@ public final class TestSecureOzoneCluster {
   private static String clusterId;
   private static String scmId;
   private static String omId;
+  private OzoneManagerProtocolClientSideTranslatorPB omClient;
+  private KeyPair keyPair;
+  private Path metaDirPath;
 
   @Before
   public void init() {
     try {
       conf = new OzoneConfiguration();
+      conf.set(ScmConfigKeys.OZONE_SCM_CLIENT_ADDRESS_KEY, "localhost");
+      DefaultMetricsSystem.setMiniClusterMode(true);
+      final String path = GenericTestUtils
+          .getTempPath(UUID.randomUUID().toString());
+      metaDirPath = Paths.get(path, "om-meta");
+      conf.set(OZONE_METADATA_DIRS, metaDirPath.toString());
       startMiniKdc();
       setSecureConfig(conf);
       createCredentialsInKDC(conf, miniKdc);
+      generateKeyPair(conf);
     } catch (IOException e) {
       LOGGER.error("Failed to initialize TestSecureOzoneCluster", e);
     } catch (Exception e) {
@@ -106,6 +141,10 @@ public final class TestSecureOzoneCluster {
       if (om != null) {
         om.stop();
       }
+      if (omClient != null) {
+        omClient.close();
+      }
+      FileUtils.deleteQuietly(metaDirPath.toFile());
     } catch (Exception e) {
       LOGGER.error("Failed to stop TestSecureOzoneCluster", e);
     }
@@ -115,11 +154,11 @@ public final class TestSecureOzoneCluster {
       throws Exception {
     createPrincipal(scmKeytab,
         conf.get(ScmConfigKeys.HDDS_SCM_KERBEROS_PRINCIPAL_KEY));
-     createPrincipal(spnegoKeytab,
-         conf.get(ScmConfigKeys
-             .HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
-        conf.get(OMConfigKeys
-            .OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
+    createPrincipal(spnegoKeytab,
+        conf.get(ScmConfigKeys
+            .HDDS_SCM_HTTP_KERBEROS_PRINCIPAL_KEY));
+    conf.get(OMConfigKeys
+        .OZONE_OM_HTTP_KERBEROS_PRINCIPAL_KEY);
     createPrincipal(omKeyTab,
         conf.get(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY));
   }
@@ -137,13 +176,14 @@ public final class TestSecureOzoneCluster {
     miniKdc.start();
   }
 
-  private void stopMiniKdc() throws Exception {
+  private void stopMiniKdc() {
     miniKdc.stop();
   }
 
   private void setSecureConfig(Configuration conf) throws IOException {
     conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
-    String host = KerberosUtil.getLocalHostName();
+    conf.setBoolean(OZONE_ENABLED, true);
+    String host = InetAddress.getLocalHost().getCanonicalHostName();
     String realm = miniKdc.getRealm();
     curUser = UserGroupInformation.getCurrentUser()
         .getUserName();
@@ -195,7 +235,7 @@ public final class TestSecureOzoneCluster {
     final String path = GenericTestUtils
         .getTempPath(UUID.randomUUID().toString());
     Path scmPath = Paths.get(path, "scm-meta");
-    conf.set(OzoneConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
+    conf.set(OZONE_METADATA_DIRS, scmPath.toString());
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
     SCMStorage scmStore = new SCMStorage(conf);
     scmStore.setClusterId(clusterId);
@@ -245,60 +285,261 @@ public final class TestSecureOzoneCluster {
   }
 
   /**
-   * Tests the secure KSM Initialization Failure.
+   * Tests the secure om Initialization Failure.
    *
    * @throws IOException
    */
   @Test
-  public void testSecureKsmInitializationFailure() throws Exception {
+  public void testSecureOMInitializationFailure() throws Exception {
     initSCM();
     // Create a secure SCM instance as om client will connect to it
     scm = StorageContainerManager.createSCM(null, conf);
 
-    final String path = GenericTestUtils
-        .getTempPath(UUID.randomUUID().toString());
-    OMStorage ksmStore = new OMStorage(conf);
-    ksmStore.setClusterId("testClusterId");
-    ksmStore.setScmId("testScmId");
-    // writes the version file properties
-    ksmStore.initialize();
+    setupOm(conf);
     conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
         "non-existent-u...@example.com");
     testCommonKerberosFailures(() -> OzoneManager.createOm(null, conf));
   }
 
   /**
-   * Tests the secure KSM Initialization success.
+   * Tests the secure om Initialization success.
    *
    * @throws IOException
    */
   @Test
-  public void testSecureKsmInitializationSuccess() throws Exception {
+  public void testSecureOmInitializationSuccess() throws Exception {
     initSCM();
     // Create a secure SCM instance as om client will connect to it
     scm = StorageContainerManager.createSCM(null, conf);
     LogCapturer logs = LogCapturer.captureLogs(OzoneManager.LOG);
+    GenericTestUtils.setLogLevel(OzoneManager.LOG, INFO);
+
+    setupOm(conf);
+    try {
+      om.start();
+    } catch (Exception ex) {
+      // Expects timeout failure from scmClient in om but om user login via
+      // kerberos should succeed.
+      Assert.assertTrue(logs.getOutput().contains("Ozone Manager login"
+          + " successful"));
+    }
+  }
+
+  /**
+   * Performs following tests for delegation token.
+   * 1. Get valid delegation token
+   * 2. Test successful token renewal.
+   * 3. Client can authenticate using token.
+   * 4. Delegation token renewal without Kerberos auth fails.
+   * 5. Test success of token cancellation.
+   * 5. Test failure of token cancellation.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDelegationToken() throws Exception {
+
+    // Capture logs for assertions
+    LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
     GenericTestUtils
-        .setLogLevel(LoggerFactory.getLogger(OzoneManager.class.getName()),
-            org.slf4j.event.Level.INFO);
+        .setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
+
+    // Setup secure OM for start
+    setupOm(conf);
+    long omVersion =
+        RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+    // Start OM
+    om.start();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String username = ugi.getUserName();
+    ugi.setAuthenticationMethod(AuthenticationMethod.KERBEROS);
+
+    // Get first OM client which will authenticate via Kerberos
+    omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+        RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+            OmUtils.getOmAddress(conf), ugi, conf,
+            NetUtils.getDefaultSocketFactory(conf),
+            CLIENT_TIMEOUT));
+
+    // Assert if auth was successful via Kerberos
+    Assert.assertFalse(logs.getOutput().contains(
+        "Auth successful for " + username + " (auth:KERBEROS)"));
+
+    // Case 1: Test successful delegation token.
+    Token<OzoneTokenIdentifier> token = omClient
+        .getDelegationToken(new Text("om"));
+
+    // Case 2: Test successful token renewal.
+    long renewalTime = omClient.renewDelegationToken(token);
+    Assert.assertTrue(renewalTime > 0);
+
+    // Check if token is of right kind and renewer is running om instance
+    Assert.assertEquals(token.getKind().toString(), "OzoneToken");
+    Assert.assertEquals(token.getService().toString(),
+        OmUtils.getOmRpcAddress(conf));
+    omClient.close();
+
+    // Create a remote ugi and set its authentication method to Token
+    UserGroupInformation testUser = UserGroupInformation
+        .createRemoteUser(TEST_USER);
+    testUser.addToken(token);
+    testUser.setAuthenticationMethod(AuthMethod.TOKEN);
+    UserGroupInformation.setLoginUser(testUser);
+
+    // Get Om client, this time authentication should happen via Token
+    testUser.doAs(new PrivilegedExceptionAction<Void>() {
+      @Override
+      public Void run() throws Exception {
+        omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+            RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+                OmUtils.getOmAddress(conf), testUser, conf,
+                NetUtils.getDefaultSocketFactory(conf), CLIENT_TIMEOUT));
+        return null;
+      }
+    });
+
+    // Case 3: Test Client can authenticate using token.
+    Assert.assertFalse(logs.getOutput().contains(
+        "Auth successful for " + username + " (auth:TOKEN)"));
+    LambdaTestUtils.intercept(IOException.class, "Delete Volume failed," +
+            " error:VOLUME_NOT_FOUND",
+        () -> omClient.deleteVolume("vol1"));
+    Assert.assertTrue(logs.getOutput().contains(
+        "Auth successful for " + username + " (auth:TOKEN)"));
+
+    // Case 4: Test failure of token renewal.
+    // Call to renewDelegationToken will fail but it will confirm that
+    // initial connection via DT succeeded
+    LambdaTestUtils.intercept(RemoteException.class, "Delegation "
+            + "Token can be renewed only with kerberos or web authentication",
+        () -> omClient.renewDelegationToken(token));
+    Assert.assertTrue(logs.getOutput().contains(
+        "Auth successful for " + username + " (auth:TOKEN)"));
+    //testUser.setAuthenticationMethod(AuthMethod.KERBEROS);
+    UserGroupInformation.setLoginUser(ugi);
+    omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+        RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+            OmUtils.getOmAddress(conf), ugi, conf,
+            NetUtils.getDefaultSocketFactory(conf),
+            Client.getRpcTimeout(conf)));
+
+    // Case 5: Test success of token cancellation.
+    omClient.cancelDelegationToken(token);
+    omClient.close();
+
+    // Wait for client to timeout
+    Thread.sleep(CLIENT_TIMEOUT);
+
+    Assert.assertFalse(logs.getOutput().contains("Auth failed for"));
+
+    // Case 6: Test failure of token cancellation.
+    // Get Om client, this time authentication using Token will fail as
+    // token is expired
+    omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+        RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+            OmUtils.getOmAddress(conf), testUser, conf,
+            NetUtils.getDefaultSocketFactory(conf),
+            Client.getRpcTimeout(conf)));
+    LambdaTestUtils.intercept(RemoteException.class, "can't be found in cache",
+        () -> omClient.cancelDelegationToken(token));
+    Assert.assertTrue(logs.getOutput().contains(
+        "Auth failed for"));
+  }
 
-    final String path = GenericTestUtils
-        .getTempPath(UUID.randomUUID().toString());
-    Path metaDirPath = Paths.get(path, "om-meta");
+  private void generateKeyPair(OzoneConfiguration config) throws Exception {
+    HDDSKeyGenerator keyGenerator = new HDDSKeyGenerator(conf);
+    keyPair = keyGenerator.generateKey();
+    HDDSKeyPEMWriter pemWriter = new HDDSKeyPEMWriter(config);
+    pemWriter.writeKey(keyPair, true);
+  }
 
-    OMStorage omStore = new OMStorage(conf);
+  /**
+   * Tests delegation token renewal.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDelegationTokenRenewal() throws Exception {
+    // Capture logs for assertions.
+    LogCapturer logs = LogCapturer.captureLogs(Server.AUDITLOG);
+    GenericTestUtils
+        .setLogLevel(LoggerFactory.getLogger(Server.class.getName()), INFO);
+
+    // Setup secure OM for start.
+    OzoneConfiguration newConf = new OzoneConfiguration(conf);
+    newConf.setLong(OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY, 500);
+    setupOm(newConf);
+    long omVersion =
+        RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+    OzoneManager.setTestSecureOmFlag(true);
+    // Start OM
+
+    om.start();
+
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    String username = ugi.getUserName();
+
+    // Get first OM client which will authenticate via Kerberos
+    omClient = new OzoneManagerProtocolClientSideTranslatorPB(
+        RPC.getProxy(OzoneManagerProtocolPB.class, omVersion,
+            OmUtils.getOmAddress(conf), ugi, conf,
+            NetUtils.getDefaultSocketFactory(conf),
+            CLIENT_TIMEOUT));
+
+    // Since client is already connected get a delegation token
+    Token<OzoneTokenIdentifier> token = omClient
+        .getDelegationToken(new Text("om"));
+
+    // Check if token is of right kind and renewer is running om instance
+    Assert.assertEquals(token.getKind().toString(), "OzoneToken");
+    Assert.assertEquals(token.getService().toString(),
+        OmUtils.getOmRpcAddress(conf));
+
+    // Renew delegation token
+    long expiryTime = omClient.renewDelegationToken(token);
+    Assert.assertTrue(expiryTime > 0);
+
+    // Test failure of delegation renewal
+    // 1. When renewer doesn't match (implicitly covers when renewer is
+    // null or empty )
+    Token token2 = omClient.getDelegationToken(new Text("randomService"));
+    LambdaTestUtils.intercept(RemoteException.class,
+        " with non-matching renewer randomService",
+        () -> omClient.renewDelegationToken(token2));
+
+    // 2. Test tampered token
+    OzoneTokenIdentifier tokenId = OzoneTokenIdentifier
+        .readProtoBuf(token.getIdentifier());
+    tokenId.setRenewer(new Text("om"));
+    tokenId.setMaxDate(System.currentTimeMillis() * 2);
+    Token<OzoneTokenIdentifier> tamperedToken = new Token<>(
+        tokenId.getBytes(), token2.getPassword(), token2.getKind(),
+        token2.getService());
+    LambdaTestUtils
+        .intercept(RemoteException.class, "can't be found in cache",
+            () -> omClient.renewDelegationToken(tamperedToken));
+
+    // 3. When token maxExpiryTime exceeds
+    Thread.sleep(500);
+    LambdaTestUtils
+        .intercept(RemoteException.class, "om tried to renew an expired"
+            + " token", () -> omClient.renewDelegationToken(token));
+  }
+
+  private void setupOm(OzoneConfiguration config) throws Exception {
+    OMStorage omStore = new OMStorage(config);
     omStore.setClusterId("testClusterId");
     omStore.setScmId("testScmId");
     // writes the version file properties
     omStore.initialize();
-    try {
-      om = OzoneManager.createOm(null, conf);
-    } catch (Exception ex) {
-      // Expects timeout failure from scmClient in KSM but KSM user login via
-      // kerberos should succeed
-      Assert.assertTrue(
-          logs.getOutput().contains("Ozone Manager login successful."));
-    }
+    OzoneManager.setTestSecureOmFlag(true);
+    om = OzoneManager.createOm(null, config);
+    CertificateClient certClient = Mockito.mock(CertificateClient.class);
+    Mockito.when(certClient.getPrivateKey("om"))
+        .thenReturn(keyPair.getPrivate());
+    Mockito.when(certClient.getPublicKey("om"))
+        .thenReturn(keyPair.getPublic());
+    om.setCertClient(certClient);
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59767be1/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a5cec20..c6616aa 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -23,6 +23,8 @@ import com.fasterxml.jackson.databind.ObjectWriter;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import com.google.protobuf.BlockingService;
+import java.security.KeyPair;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -35,12 +37,20 @@ import 
org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideT
 import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.ozone.security.OzoneSecurityException;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.ozone.security.OzoneSecretManager;
+import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.NetUtils;
@@ -53,7 +63,6 @@ import org.apache.hadoop.ozone.audit.AuditLoggerType;
 import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.OMAction;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.common.Storage.StorageState;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -74,6 +83,8 @@ import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.GenericOptionsParser;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
@@ -99,6 +110,7 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.TimeUnit;
 
+import static 
org.apache.hadoop.ozone.security.OzoneSecurityException.ResultCodes.*;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients;
 import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients;
 import static org.apache.hadoop.hdds.HddsUtils.isHddsEnabled;
@@ -134,13 +146,21 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   public static final Logger LOG =
       LoggerFactory.getLogger(OzoneManager.class);
 
-  private static final AuditLogger AUDIT =
-      new AuditLogger(AuditLoggerType.OMLOGGER);
+  private static final AuditLogger AUDIT = new AuditLogger(
+      AuditLoggerType.OMLOGGER);
 
   private static final String USAGE =
       "Usage: \n ozone om [genericOptions] " + "[ "
           + StartupOption.INIT.getName() + " ]\n " + "ozone om [ "
           + StartupOption.HELP.getName() + " ]\n";
+  private static final String OM_DAEMON = "om";
+  private static boolean securityEnabled = false;
+  private static OzoneSecretManager secretManager;
+  // TO DO: For testing purpose only, remove before commiting
+  private KeyPair keyPair;
+  private CertificateClient certClient;
+  private static boolean testSecureOmFlag = false;
+  private final Text omRpcAddressTxt;
   private final OzoneConfiguration configuration;
   private RPC.Server omRpcServer;
   private InetSocketAddress omRpcAddress;
@@ -174,21 +194,39 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
           ResultCodes.OM_NOT_INITIALIZED);
     }
 
-    scmContainerClient = getScmContainerClient(configuration);
-
-    // verifies that the SCM info in the OM Version file is correct.
-    scmBlockClient = getScmBlockClient(configuration);
-
-    ScmInfo scmInfo = scmBlockClient.getScmInfo();
-    if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
-        .getScmId().equals(omStorage.getScmId()))) {
-      throw new OMException("SCM version info mismatch.",
-          ResultCodes.SCM_VERSION_MISMATCH_ERROR);
+    if (!testSecureOmFlag) {
+      scmContainerClient = getScmContainerClient(configuration);
+      // verifies that the SCM info in the OM Version file is correct.
+      scmBlockClient = getScmBlockClient(configuration);
+      ScmInfo scmInfo = scmBlockClient.getScmInfo();
+      if (!(scmInfo.getClusterId().equals(omStorage.getClusterID()) && scmInfo
+          .getScmId().equals(omStorage.getScmId()))) {
+        throw new OMException("SCM version info mismatch.",
+            ResultCodes.SCM_VERSION_MISMATCH_ERROR);
+      }
+    } else {
+      // For testing purpose only
+      scmContainerClient = null;
+      scmBlockClient = null;
     }
+    final int handlerCount = conf.getInt(OZONE_OM_HANDLER_COUNT_KEY,
+        OZONE_OM_HANDLER_COUNT_DEFAULT);
 
     RPC.setProtocolEngine(configuration, OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
 
+    BlockingService omService = newReflectiveBlockingService(
+        new OzoneManagerProtocolServerSideTranslatorPB(this));
+    final InetSocketAddress omNodeRpcAddr =
+        getOmAddress(configuration);
+    omRpcAddressTxt = new Text(OmUtils.getOmRpcAddress(configuration));
+    secretManager = createSecretManager(configuration);
+
+    omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
+        OzoneManagerProtocolPB.class, omService,
+        handlerCount);
+    omRpcAddress = updateRPCListenAddress(configuration,
+        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
     metadataManager = new OmMetadataManagerImpl(configuration);
     volumeManager = new VolumeManagerImpl(metadataManager, configuration);
     bucketManager = new BucketManagerImpl(metadataManager);
@@ -258,6 +296,65 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
 
+  private OzoneSecretManager createSecretManager(
+      OzoneConfiguration conf)
+      throws IOException {
+    long tokenRemoverScanInterval =
+        conf.getTimeDuration(OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_KEY,
+            OMConfigKeys.DELEGATION_REMOVER_SCAN_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    long tokenMaxLifetime =
+        conf.getTimeDuration(OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_KEY,
+            OMConfigKeys.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    long tokenRenewInterval =
+        conf.getTimeDuration(OMConfigKeys.DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
+            OMConfigKeys.DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    return new OzoneSecretManager(conf, tokenMaxLifetime, tokenRenewInterval,
+        tokenRemoverScanInterval, omRpcAddressTxt);
+  }
+
+  private void stopSecretManager() throws IOException {
+    if (secretManager != null) {
+      LOG.info("Stopping OM secret manager");
+      secretManager.stop();
+    }
+  }
+
+  private void startSecretManager() {
+    if (secretManager != null) {
+      try {
+        readKeyPair();
+        LOG.info("Starting OM secret manager");
+        secretManager.startThreads(keyPair);
+      } catch (IOException e) {
+        // Inability to start secret manager
+        // can't be recovered from.
+        LOG.error("Error starting secret manager.", e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public void setCertClient(CertificateClient certClient) {
+    // TODO: Initialize it in contructor with implementation for certClient.
+    this.certClient = certClient;
+  }
+
+  /**
+   * Read private key from file.
+   */
+  private void readKeyPair() throws OzoneSecurityException {
+    try {
+      keyPair = new KeyPair(certClient.getPublicKey(OM_DAEMON),
+          certClient.getPrivateKey(OM_DAEMON));
+    } catch (Exception e) {
+      throw new OzoneSecurityException("Error reading private file for "
+          + "OzoneManager", e, OM_PUBLIC_PRIVATE_KEY_FILE_NOT_EXIST);
+    }
+  }
+
   /**
    * Login OM service user if security and Kerberos are enabled.
    *
@@ -267,8 +364,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private static void loginOMUser(OzoneConfiguration conf)
       throws IOException, AuthenticationException {
 
-    if (SecurityUtil.getAuthenticationMethod(conf).equals
-        (AuthenticationMethod.KERBEROS)) {
+    if (SecurityUtil.getAuthenticationMethod(conf).equals(
+        AuthenticationMethod.KERBEROS)) {
       LOG.debug("Ozone security is enabled. Attempting login for OM user. "
               + "Principal: {},keytab: {}", conf.get(
           OZONE_OM_KERBEROS_PRINCIPAL_KEY),
@@ -280,8 +377,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       SecurityUtil.login(conf, OZONE_OM_KERBEROS_KEYTAB_FILE_KEY,
           OZONE_OM_KERBEROS_PRINCIPAL_KEY, socAddr.getHostName());
     } else {
-      throw new AuthenticationException(SecurityUtil.getAuthenticationMethod
-          (conf) + " authentication method not supported. OM user login "
+      throw new AuthenticationException(SecurityUtil.getAuthenticationMethod(
+          conf) + " authentication method not supported. OM user login "
           + "failed.");
     }
     LOG.info("Ozone Manager login successful.");
@@ -354,7 +451,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
         .setPort(addr.getPort())
         .setNumHandlers(handlerCount)
         .setVerbose(false)
-        .setSecretManager(null)
+        .setSecretManager(secretManager)
         .build();
 
     DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
@@ -394,6 +491,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     out.println(USAGE + "\n");
   }
 
+  private static boolean isOzoneSecurityEnabled() {
+    return securityEnabled;
+  }
+
   /**
    * Constructs OM instance based on command line arguments.
    *
@@ -438,8 +539,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       terminate(1);
       return null;
     }
+
+    securityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
     // Authenticate KSM if security is enabled
-    if (conf.getBoolean(OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY, true)) {
+    if (securityEnabled) {
       loginOMUser(conf);
     }
     switch (startOpt) {
@@ -572,19 +675,6 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
    * Start service.
    */
   public void start() throws IOException {
-
-    InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
-    int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
-        OZONE_OM_HANDLER_COUNT_DEFAULT);
-    BlockingService omService = newReflectiveBlockingService(
-        new OzoneManagerProtocolServerSideTranslatorPB(this));
-    omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
-        OzoneManagerProtocolPB.class, omService,
-        handlerCount);
-    omRpcAddress = updateRPCListenAddress(configuration,
-        OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
-    omRpcServer.start();
-
     LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
         omRpcAddress));
 
@@ -592,7 +682,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     DefaultMetricsSystem.initialize("OzoneManager");
 
     metadataManager.start(configuration);
-
+    startSecretManagerIfNecessary();
 
     // Set metrics and start metrics back ground thread
     metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
@@ -613,8 +703,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     metricsTimer.schedule(scheduleOMMetricsWriteTask, 0, period);
 
     keyManager.start(configuration);
-
-    httpServer = new OzoneManagerHttpServer(configuration, this);
+    omRpcServer.start();
     try {
       httpServer.start();
     } catch (Exception ex) {
@@ -636,6 +725,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       scheduleOMMetricsWriteTask = null;
       omRpcServer.stop();
       keyManager.stop();
+      stopSecretManager();
       httpServer.stop();
       metadataManager.stop();
       metrics.unRegister();
@@ -657,6 +747,140 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     }
   }
 
+  private void startSecretManagerIfNecessary() {
+    boolean shouldRun = shouldUseDelegationTokens() && 
isOzoneSecurityEnabled();
+    boolean running = secretManager.isRunning();
+    if (shouldRun && !running) {
+      startSecretManager();
+    }
+  }
+
+  private boolean shouldUseDelegationTokens() {
+    return UserGroupInformation.isSecurityEnabled();
+  }
+
+
+  /**
+   *
+   * @return true if delegation token operation is allowed
+   */
+  private boolean isAllowedDelegationTokenOp() throws IOException {
+    AuthenticationMethod authMethod = getConnectionAuthenticationMethod();
+    if (UserGroupInformation.isSecurityEnabled()
+        && (authMethod != AuthenticationMethod.KERBEROS)
+        && (authMethod != AuthenticationMethod.KERBEROS_SSL)
+        && (authMethod != AuthenticationMethod.CERTIFICATE)) {
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Returns authentication method used to establish the connection.
+   * @return AuthenticationMethod used to establish connection
+   * @throws IOException
+   */
+  private AuthenticationMethod getConnectionAuthenticationMethod()
+      throws IOException {
+    UserGroupInformation ugi = getRemoteUser();
+    AuthenticationMethod authMethod = ugi.getAuthenticationMethod();
+    if (authMethod == AuthenticationMethod.PROXY) {
+      authMethod = ugi.getRealUser().getAuthenticationMethod();
+    }
+    return authMethod;
+  }
+
+  // optimize ugi lookup for RPC operations to avoid a trip through
+  // UGI.getCurrentUser which is synch'ed
+  private static UserGroupInformation getRemoteUser() throws IOException {
+    UserGroupInformation ugi = Server.getRemoteUser();
+    return (ugi != null) ? ugi : UserGroupInformation.getCurrentUser();
+  }
+
+  /**
+   * Get delegation token from OzoneManager.
+   * @param renewer Renewer information
+   * @return delegationToken DelegationToken signed by OzoneManager
+   * @throws IOException on error
+   */
+  @Override
+  public Token<OzoneTokenIdentifier> getDelegationToken(Text renewer)
+      throws IOException {
+    final boolean success;
+    final String tokenId;
+    Token<OzoneTokenIdentifier> token;
+
+    if (!isAllowedDelegationTokenOp()) {
+      throw new IOException("Delegation Token can be issued only with "
+          + "kerberos or web authentication");
+    }
+    if (secretManager == null || !secretManager.isRunning()) {
+      LOG.warn("trying to get DT with no secret manager running in OM.");
+      return null;
+    }
+
+    UserGroupInformation ugi = getRemoteUser();
+    String user = ugi.getUserName();
+    Text owner = new Text(user);
+    Text realUser = null;
+    if (ugi.getRealUser() != null) {
+      realUser = new Text(ugi.getRealUser().getUserName());
+    }
+
+    token = secretManager.createToken(owner, renewer, realUser);
+    return token;
+  }
+
+  /**
+   * Method to renew a delegationToken issued by OzoneManager.
+   * @param token token to renew
+   * @return new expiryTime of the token
+   * @throws InvalidToken if {@code token} is invalid
+   * @throws IOException on other errors
+   */
+  @Override
+  public long renewDelegationToken(Token<OzoneTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    long expiryTime;
+
+    try {
+
+      if (!isAllowedDelegationTokenOp()) {
+        throw new IOException("Delegation Token can be renewed only with "
+            + "kerberos or web authentication");
+      }
+      String renewer = getRemoteUser().getShortUserName();
+      expiryTime = secretManager.renewToken(token, renewer);
+
+    } catch (AccessControlException ace) {
+      final OzoneTokenIdentifier id = OzoneTokenIdentifier.readProtoBuf(
+          token.getIdentifier());
+      LOG.error("Delegation token renewal failed for dt: {}, cause: {}",
+          id.toString(), ace.getMessage());
+      throw ace;
+    }
+    return expiryTime;
+  }
+
+  /**
+   * Cancels a delegation token.
+   * @param token token to cancel
+   * @throws IOException on error
+   */
+  @Override
+  public void cancelDelegationToken(Token<OzoneTokenIdentifier> token)
+      throws IOException {
+    OzoneTokenIdentifier id = null;
+    try {
+      String canceller = getRemoteUser().getUserName();
+      id = secretManager.cancelToken(token, canceller);
+      LOG.trace("Delegation token renewed for dt: {}", id);
+    } catch (AccessControlException ace) {
+      LOG.error("Delegation token renewal failed for dt: {}, cause: {}", id,
+          ace.getMessage());
+      throw ace;
+    }
+  }
   /**
    * Creates a volume.
    *
@@ -1430,4 +1654,8 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
       return name;
     }
   }
+
+  public static void setTestSecureOmFlag(boolean testSecureOmFlag) {
+    OzoneManager.testSecureOmFlag = testSecureOmFlag;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/59767be1/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 64806ed..e2536d3 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -119,6 +119,15 @@ import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .SetVolumePropertyResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .Status;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import 
org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
+import 
org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenResponseProto;
+import 
org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
+import 
org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenResponseProto;
+import 
org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
+import 
org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenResponseProto;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.io.Text;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -651,4 +660,54 @@ public class OzoneManagerProtocolServerSideTranslatorPB 
implements
     }
     return resp.build();
   }
+
+  @Override
+  public GetDelegationTokenResponseProto getDelegationToken(
+      RpcController controller, GetDelegationTokenRequestProto request)
+      throws ServiceException {
+    try {
+      Token<OzoneTokenIdentifier> token = impl
+          .getDelegationToken(new Text(request.getRenewer()));
+      if (token != null) {
+        return GetDelegationTokenResponseProto.newBuilder()
+            .setToken(OMPBHelper.convertToTokenProto(token)).build();
+      }
+      return GetDelegationTokenResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public RenewDelegationTokenResponseProto renewDelegationToken(
+      RpcController controller, RenewDelegationTokenRequestProto request)
+      throws ServiceException {
+    try {
+      if(request.hasToken()) {
+        long expiryTime = impl
+            .renewDelegationToken(
+                OMPBHelper.convertToDelegationToken(request.getToken()));
+        return RenewDelegationTokenResponseProto.newBuilder()
+            .setNewExpiryTime(expiryTime).build();
+      }
+      return RenewDelegationTokenResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public CancelDelegationTokenResponseProto cancelDelegationToken(
+      RpcController controller, CancelDelegationTokenRequestProto req)
+      throws ServiceException {
+    try {
+      if(req.hasToken()) {
+        impl.cancelDelegationToken(
+            OMPBHelper.convertToDelegationToken(req.getToken()));
+      }
+      return CancelDelegationTokenResponseProto.getDefaultInstance();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to