This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d072d33  HDDS-2020. Remove mTLS from Ozone GRPC. Contributed by Xiaoyu 
Yao.
d072d33 is described below

commit d072d3304ce3fe33e22bb703839b41ab5107ad42
Author: Xiaoyu Yao <x...@apache.org>
AuthorDate: Wed Aug 28 08:56:33 2019 -0700

    HDDS-2020. Remove mTLS from Ozone GRPC. Contributed by Xiaoyu Yao.
    
    Signed-off-by: Anu Engineer <aengin...@apache.org>
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  32 +++--
 .../hadoop/hdds/scm/XceiverClientManager.java      |  28 ++++-
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java |   9 +-
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  26 +---
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  60 ++++++---
 .../hadoop/hdds/security/x509/SecurityConfig.java  | 137 ---------------------
 .../x509/certificate/client/CertificateClient.java |   6 +
 .../client/DefaultCertificateClient.java           |  31 ++++-
 .../common/src/main/resources/ozone-default.xml    |  27 ----
 .../container/common/helpers/ContainerMetrics.java |   9 +-
 .../common/transport/server/XceiverServerGrpc.java |  16 +--
 .../transport/server/ratis/XceiverServerRatis.java |   4 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   1 +
 .../apache/hadoop/hdds/server/ProfileServlet.java  |   1 -
 .../hadoop/hdds/server/TestProfileServlet.java     |  11 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |   5 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   2 +
 .../hdds/scm/pipeline/PipelineReportHandler.java   |   3 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   9 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |  15 +--
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  15 ++-
 .../hdds/scm/server/StorageContainerManager.java   |  13 +-
 .../container/TestCloseContainerEventHandler.java  |   2 +-
 .../scm/container/TestSCMContainerManager.java     |   2 +-
 .../hdds/scm/node/TestContainerPlacement.java      |  10 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |   2 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |   6 +-
 .../TestOneReplicaPipelineSafeModeRule.java        |   2 +-
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  10 +-
 .../ozone/client/OzoneMultipartUploadList.java     |   1 -
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  54 ++------
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   1 -
 .../ozone/om/helpers/OmMultipartKeyInfo.java       |   3 -
 .../ozone/om/helpers/OmMultipartUploadList.java    |   3 -
 .../om/helpers/OmMultipartUploadListParts.java     |   1 -
 ...MultipartUploadList.java => ServiceInfoEx.java} |  30 ++---
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   9 +-
 ...OzoneManagerProtocolClientSideTranslatorPB.java |  21 +++-
 .../src/main/proto/OzoneManagerProtocol.proto      |   3 +
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  15 ++-
 .../hadoop/ozone/TestStorageContainerManager.java  |  23 ++--
 .../ozone/client/CertificateClientTestImpl.java    |   7 +-
 .../rpc/TestContainerReplicationEndToEnd.java      |  10 +-
 .../ozoneimpl/TestOzoneContainerWithTLS.java       | 104 +++++++++-------
 .../hadoop/ozone/scm/TestXceiverClientManager.java |   6 +-
 .../java/org/apache/hadoop/ozone/om/OMMetrics.java |   1 -
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  11 ++
 .../protocolPB/OzoneManagerRequestHandler.java     |   6 +-
 48 files changed, 375 insertions(+), 428 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index d8daaa7..b31da05 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -52,8 +52,8 @@ import 
org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
+import java.security.cert.X509Certificate;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
@@ -80,6 +80,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   private boolean closed = false;
   private SecurityConfig secConfig;
   private final boolean topologyAwareRead;
+  private X509Certificate caCert;
 
   /**
    * Constructs a client that can communicate with the Container framework on
@@ -87,8 +88,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
    *
    * @param pipeline - Pipeline that defines the machines.
    * @param config   -- Ozone Config
+   * @param caCert   - SCM ca certificate.
    */
-  public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
+  public XceiverClientGrpc(Pipeline pipeline, Configuration config,
+      X509Certificate caCert) {
     super();
     Preconditions.checkNotNull(pipeline);
     Preconditions.checkNotNull(config);
@@ -103,6 +106,18 @@ public class XceiverClientGrpc extends XceiverClientSpi {
     this.topologyAwareRead = config.getBoolean(
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY,
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
+    this.caCert = caCert;
+  }
+
+  /**
+   * Constructs a client that can communicate with the Container framework on
+   * data nodes.
+   *
+   * @param pipeline - Pipeline that defines the machines.
+   * @param config   -- Ozone Config
+   */
+  public XceiverClientGrpc(Pipeline pipeline, Configuration config) {
+    this(pipeline, config, null);
   }
 
   /**
@@ -151,19 +166,10 @@ public class XceiverClientGrpc extends XceiverClientSpi {
             .intercept(new ClientCredentialInterceptor(userName, encodedToken),
                 new GrpcClientInterceptor());
     if (secConfig.isGrpcTlsEnabled()) {
-      File trustCertCollectionFile = secConfig.getTrustStoreFile(COMPONENT);
-      File privateKeyFile = secConfig.getClientPrivateKeyFile(COMPONENT);
-      File clientCertChainFile = secConfig.getClientCertChainFile(COMPONENT);
-
       SslContextBuilder sslContextBuilder = GrpcSslContexts.forClient();
-      if (trustCertCollectionFile != null) {
-        sslContextBuilder.trustManager(trustCertCollectionFile);
-      }
-      if (secConfig.isGrpcMutualTlsRequired() && clientCertChainFile != null
-          && privateKeyFile != null) {
-        sslContextBuilder.keyManager(clientCertChainFile, privateKeyFile);
+      if (caCert != null) {
+        sslContextBuilder.trustManager(caCert);
       }
-
       if (secConfig.useTestCert()) {
         channelBuilder.overrideAuthority("localhost");
       }
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index f906ab6..ebed288 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -31,6 +31,8 @@ import org.apache.hadoop.hdds.conf.ConfigType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -39,6 +41,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.concurrent.Callable;
 import java.util.concurrent.TimeUnit;
 
@@ -65,6 +69,7 @@ public class XceiverClientManager implements Closeable {
   private final Configuration conf;
   private final Cache<String, XceiverClientSpi> clientCache;
   private final boolean useRatis;
+  private X509Certificate caCert;
 
   private static XceiverClientMetrics metrics;
   private boolean isSecurityEnabled;
@@ -74,11 +79,13 @@ public class XceiverClientManager implements Closeable {
    *
    * @param conf configuration
    */
-  public XceiverClientManager(Configuration conf) {
-    this(conf, OzoneConfiguration.of(conf).getObject(ScmClientConfig.class));
+  public XceiverClientManager(Configuration conf) throws IOException {
+    this(conf, OzoneConfiguration.of(conf).getObject(ScmClientConfig.class),
+        null);
   }
 
-  public XceiverClientManager(Configuration conf, ScmClientConfig clientConf) {
+  public XceiverClientManager(Configuration conf, ScmClientConfig clientConf,
+      String caCertPem) throws IOException {
     Preconditions.checkNotNull(clientConf);
     Preconditions.checkNotNull(conf);
     long staleThresholdMs = clientConf.getStaleThreshold(MILLISECONDS);
@@ -87,6 +94,16 @@ public class XceiverClientManager implements Closeable {
         ScmConfigKeys.DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
     this.conf = conf;
     this.isSecurityEnabled = OzoneSecurityUtil.isSecurityEnabled(conf);
+    if (isSecurityEnabled) {
+      Preconditions.checkNotNull(caCertPem);
+      try {
+        this.caCert = CertificateCodec.getX509Cert(caCertPem);
+      } catch (CertificateException ex) {
+        throw new SCMSecurityException("Error: Fail to get SCM CA certificate",
+            ex);
+      }
+    }
+
     this.clientCache = CacheBuilder.newBuilder()
         .expireAfterAccess(staleThresholdMs, MILLISECONDS)
         .maximumSize(clientConf.getMaxSize())
@@ -211,11 +228,12 @@ public class XceiverClientManager implements Closeable {
             XceiverClientSpi client = null;
             switch (type) {
             case RATIS:
-              client = XceiverClientRatis.newXceiverClientRatis(pipeline, 
conf);
+              client = XceiverClientRatis.newXceiverClientRatis(pipeline, conf,
+                  caCert);
               client.connect();
               break;
             case STAND_ALONE:
-              client = new XceiverClientGrpc(pipeline, conf);
+              client = new XceiverClientGrpc(pipeline, conf, caCert);
               break;
             case CHAINED:
             default:
diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 3d83675..d234a3f 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm;
 
 import java.io.IOException;
+import java.security.cert.X509Certificate;
 import java.util.Collection;
 import java.util.List;
 import java.util.Objects;
@@ -78,6 +79,12 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
   public static XceiverClientRatis newXceiverClientRatis(
       org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
       Configuration ozoneConf) {
+    return newXceiverClientRatis(pipeline, ozoneConf, null);
+  }
+
+  public static XceiverClientRatis newXceiverClientRatis(
+      org.apache.hadoop.hdds.scm.pipeline.Pipeline pipeline,
+      Configuration ozoneConf, X509Certificate caCert) {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@@ -87,7 +94,7 @@ public final class XceiverClientRatis extends 
XceiverClientSpi {
         HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
     final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
-        SecurityConfig(ozoneConf));
+        SecurityConfig(ozoneConf), caCert);
     return new XceiverClientRatis(pipeline,
         SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
         retryPolicy, tlsConfig, clientRequestTimeout);
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 394eed7..99972ae 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -176,34 +176,18 @@ public final class HddsConfigKeys {
   private HddsConfigKeys() {
   }
 
+  // Enable TLS for GRPC clients/server in ozone.
   public static final String HDDS_GRPC_TLS_ENABLED = "hdds.grpc.tls.enabled";
   public static final boolean HDDS_GRPC_TLS_ENABLED_DEFAULT = false;
 
-  public static final String HDDS_GRPC_MUTUAL_TLS_REQUIRED =
-      "hdds.grpc.mutual.tls.required";
-  public static final boolean HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT = false;
-
+  // Choose TLS provider the default is set to OPENSSL for better performance.
   public static final String HDDS_GRPC_TLS_PROVIDER = "hdds.grpc.tls.provider";
   public static final String HDDS_GRPC_TLS_PROVIDER_DEFAULT = "OPENSSL";
 
-  public static final String HDDS_TRUST_STORE_FILE_NAME =
-      "hdds.trust.cert.collection.file.name";
-  public static final String HDDS_TRUST_STORE_FILE_NAME_DEFAULT = "ca.crt";
-
-  public static final String
-      HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME =
-      "hdds.server.cert.chain.file.name";
-  public static final String
-      HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT = "server.crt";
-
-  public static final String
-      HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME =
-      "hdds.client.cert.chain.file.name";
-  public static final String
-      HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT = "client.crt";
-
+  // Test only settings for using test signed certificate, authority assume to
+  // be localhost.
   public static final String HDDS_GRPC_TLS_TEST_CERT = "hdds.grpc.tls" +
-      ".test_cert";
+      ".test.cert";
   public static final boolean HDDS_GRPC_TLS_TEST_CERT_DEFAULT = false;
 
   // Comma separated acls (users, groups) allowing clients accessing
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index ea73a28..3ad4e2e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.hdds.ratis;
 
 import java.io.IOException;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -31,7 +33,11 @@ import java.util.stream.Collectors;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateServer;
+import 
org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
 
@@ -200,29 +206,47 @@ public interface RatisHelper {
     return builder.build();
   }
 
-  static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf) {
-    if (conf.isGrpcTlsEnabled()) {
-      if (conf.isGrpcMutualTlsRequired()) {
-        return new GrpcTlsConfig(conf.getClientPrivateKeyFile(),
-            conf.getClientCertChainFile(), conf.getTrustStoreFile(), true);
-      } else {
-        return new GrpcTlsConfig(
-            null, null, conf.getTrustStoreFile(), false);
+  // For External gRPC client to server with gRPC TLS.
+  // No mTLS for external client as SCM CA does not issued certificates for 
them
+  static GrpcTlsConfig createTlsClientConfig(SecurityConfig conf,
+      X509Certificate caCert) {
+    GrpcTlsConfig tlsConfig = null;
+    if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
+      tlsConfig = new GrpcTlsConfig(null, null,
+          caCert, false);
+    }
+    return tlsConfig;
+  }
+
+  // For Internal gRPC client from SCM to DN with gRPC TLS
+  static GrpcTlsConfig createTlsClientConfigForSCM(SecurityConfig conf,
+      CertificateServer certificateServer) throws IOException {
+    if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
+      try {
+        X509Certificate caCert =
+            CertificateCodec.getX509Certificate(
+                certificateServer.getCACertificate());
+        return new GrpcTlsConfig(null, null,
+            caCert, false);
+      } catch (CertificateException ex) {
+        throw new SCMSecurityException("Fail to find SCM CA certificate.", ex);
       }
     }
     return null;
   }
 
-  static GrpcTlsConfig createTlsServerConfig(SecurityConfig conf) {
-    if (conf.isGrpcTlsEnabled()) {
-      if (conf.isGrpcMutualTlsRequired()) {
-        return new GrpcTlsConfig(
-            conf.getServerPrivateKeyFile(), conf.getServerCertChainFile(), 
null,
-            false);
-      } else {
-        return new GrpcTlsConfig(conf.getServerPrivateKeyFile(),
-            conf.getServerCertChainFile(), conf.getClientCertChainFile(), 
true);
-      }
+  // For gRPC server running DN container service with gPRC TLS
+  // No mTLS as the channel is shared for for external client, which
+  // does not have SCM CA issued certificates.
+  // In summary:
+  // authenticate from server to client is via TLS.
+  // authenticate from client to server is via block token (or container 
token).
+  static GrpcTlsConfig createTlsServerConfigForDN(SecurityConfig conf,
+      CertificateClient caClient)  {
+    if (conf.isSecurityEnabled() && conf.isGrpcTlsEnabled()) {
+      return new GrpcTlsConfig(
+          caClient.getPrivateKey(), caClient.getCertificate(),
+          null, false);
     }
     return null;
   }
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
index 969f7bb..8aaba5d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/SecurityConfig.java
@@ -20,7 +20,6 @@
 package org.apache.hadoop.hdds.security.x509;
 
 import com.google.common.base.Preconditions;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslProvider;
@@ -28,7 +27,6 @@ import org.bouncycastle.jce.provider.BouncyCastleProvider;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.security.Provider;
@@ -47,14 +45,6 @@ import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_PROVIDER_DEFAULT;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT_DEFAULT;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_MUTUAL_TLS_REQUIRED;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_TRUST_STORE_FILE_NAME;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_TRUST_STORE_FILE_NAME_DEFAULT;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME;
-import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_ALGORITHM;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
@@ -106,12 +96,8 @@ public class SecurityConfig {
   private final String certificateFileName;
   private final boolean grpcTlsEnabled;
   private boolean grpcTlsUseTestCert;
-  private String trustStoreFileName;
-  private String serverCertChainFileName;
-  private String clientCertChainFileName;
   private final Duration defaultCertDuration;
   private final boolean isSecurityEnabled;
-  private boolean grpcMutualTlsRequired;
 
   /**
    * Constructs a SecurityConfig.
@@ -158,20 +144,6 @@ public class SecurityConfig {
         HDDS_GRPC_TLS_ENABLED_DEFAULT);
 
     if (grpcTlsEnabled) {
-      this.grpcMutualTlsRequired = configuration.getBoolean(
-          HDDS_GRPC_MUTUAL_TLS_REQUIRED, 
HDDS_GRPC_MUTUAL_TLS_REQUIRED_DEFAULT);
-
-      this.trustStoreFileName = this.configuration.get(
-          HDDS_TRUST_STORE_FILE_NAME, HDDS_TRUST_STORE_FILE_NAME_DEFAULT);
-
-      this.clientCertChainFileName = this.configuration.get(
-          HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME,
-          HDDS_CLIENT_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT);
-
-      this.serverCertChainFileName = this.configuration.get(
-          HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME,
-          HDDS_SERVER_CERTIFICATE_CHAIN_FILE_NAME_DEFAULT);
-
       this.grpcTlsUseTestCert = this.configuration.getBoolean(
           HDDS_GRPC_TLS_TEST_CERT, HDDS_GRPC_TLS_TEST_CERT_DEFAULT);
     }
@@ -352,115 +324,6 @@ public class SecurityConfig {
   }
 
   /**
-   * Returns true if TLS mutual authentication is enabled for gRPC services.
-   * @return true if TLS is enabled for gRPC services.
-   */
-  public boolean isGrpcMutualTlsRequired() {
-    return this.grpcMutualTlsRequired;
-  }
-
-  /**
-   * Returns the TLS-enabled gRPC client private key file(Only needed for 
mutual
-   * authentication) for the given component.
-   * @param component name of the component.
-   * @return the TLS-enabled gRPC client private key file.
-   */
-  public File getClientPrivateKeyFile(String component) {
-    return Paths.get(getKeyLocation(component).toString(),
-        "client." + privateKeyFileName).toFile();
-  }
-
-  /**
-   * Returns the TLS-enabled gRPC client private key file(Only needed for 
mutual
-   * authentication).
-   * @return the TLS-enabled gRPC client private key file.
-   */
-  public File getClientPrivateKeyFile() {
-    return getClientPrivateKeyFile(StringUtils.EMPTY);
-  }
-
-  /**
-   * Returns the TLS-enabled gRPC server private key file for the given
-   * component.
-   * @param component name of the component.
-   * @return the TLS-enabled gRPC server private key file.
-   */
-  public File getServerPrivateKeyFile(String component) {
-    return Paths.get(getKeyLocation(component).toString(),
-        "server." + privateKeyFileName).toFile();
-  }
-
-  /**
-   * Returns the TLS-enabled gRPC server private key file.
-   * @return the TLS-enabled gRPC server private key file.
-   */
-  public File getServerPrivateKeyFile() {
-    return getServerPrivateKeyFile(StringUtils.EMPTY);
-  }
-
-  /**
-   * Get the trusted CA certificate file for the given component. (CA
-   * certificate)
-   * @param component name of the component.
-   * @return the trusted CA certificate.
-   */
-  public File getTrustStoreFile(String component) {
-    return Paths.get(getKeyLocation(component).toString(),
-        trustStoreFileName).
-        toFile();
-  }
-
-  /**
-   * Get the trusted CA certificate file. (CA certificate)
-   * @return the trusted CA certificate.
-   */
-  public File getTrustStoreFile() {
-    return getTrustStoreFile(StringUtils.EMPTY);
-  }
-
-  /**
-   * Get the TLS-enabled gRPC Client certificate chain file for the given
-   * component (only needed for
-   * mutual authentication).
-   * @param component name of the component.
-   * @return the TLS-enabled gRPC Server certificate chain file.
-   */
-  public File getClientCertChainFile(String component) {
-    return Paths.get(getKeyLocation(component).toString(),
-        clientCertChainFileName).
-        toFile();
-  }
-
-  /**
-   * Get the TLS-enabled gRPC Client certificate chain file (only needed for
-   * mutual authentication).
-   * @return the TLS-enabled gRPC Server certificate chain file.
-   */
-  public File getClientCertChainFile() {
-    return getClientCertChainFile(StringUtils.EMPTY);
-  }
-
-  /**
-   * Get the TLS-enabled gRPC Server certificate chain file for the given
-   * component.
-   * @param component name of the component.
-   * @return the TLS-enabled gRPC Server certificate chain file.
-   */
-  public File getServerCertChainFile(String component) {
-    return Paths.get(getKeyLocation(component).toString(),
-        serverCertChainFileName).
-        toFile();
-  }
-
-  /**
-   * Get the TLS-enabled gRPC Server certificate chain file.
-   * @return the TLS-enabled gRPC Server certificate chain file.
-   */
-  public File getServerCertChainFile() {
-    return getServerCertChainFile(StringUtils.EMPTY);
-  }
-
-  /**
    * Get the gRPC TLS provider.
    * @return the gRPC TLS Provider.
    */
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index c36c9e0..34b4930 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -70,6 +70,12 @@ public interface CertificateClient {
   X509Certificate getCertificate();
 
   /**
+   * Return the latest CA certificate known to the client.
+   * @return latest ca certificate known to the client.
+   */
+  X509Certificate getCACertificate();
+
+  /**
    * Verifies if this certificate is part of a trusted chain.
    * @param certificate - certificate.
    * @return true if it trusted, false otherwise.
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 388c5bc..ff99e08 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -20,7 +20,9 @@
 package org.apache.hadoop.hdds.security.x509.certificate.client;
 
 import com.google.common.base.Preconditions;
+import org.apache.commons.io.FilenameUtils;
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.commons.lang3.math.NumberUtils;
 import org.apache.commons.validator.routines.DomainValidator;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
@@ -81,6 +83,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
 
   private static final String CERT_FILE_NAME_FORMAT = "%s.crt";
   private static final String CA_CERT_PREFIX = "CA-";
+  private static final int CA_CERT_PREFIX_LEN = 3;
   private final Logger logger;
   private final SecurityConfig securityConfig;
   private final KeyCodec keyCodec;
@@ -89,9 +92,9 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
   private X509Certificate x509Certificate;
   private Map<String, X509Certificate> certificateMap;
   private String certSerialId;
+  private String caCertId;
   private String component;
 
-
   DefaultCertificateClient(SecurityConfig securityConfig, Logger log,
       String certSerialId, String component) {
     Objects.requireNonNull(securityConfig);
@@ -119,6 +122,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
       if (certFiles != null) {
         CertificateCodec certificateCodec =
             new CertificateCodec(securityConfig, component);
+        long latestCaCertSerailId = -1L;
         for (File file : certFiles) {
           if (file.isFile()) {
             try {
@@ -132,6 +136,15 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
                 }
                 certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
                     cert);
+                if (file.getName().startsWith(CA_CERT_PREFIX)) {
+                  String certFileName = FilenameUtils.getBaseName(
+                      file.getName());
+                  long tmpCaCertSerailId = NumberUtils.toLong(
+                      certFileName.substring(CA_CERT_PREFIX_LEN));
+                  if (tmpCaCertSerailId > latestCaCertSerailId) {
+                    latestCaCertSerailId = tmpCaCertSerailId;
+                  }
+                }
                 getLogger().info("Added certificate from file:{}.",
                     file.getAbsolutePath());
               } else {
@@ -144,6 +157,9 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
             }
           }
         }
+        if (latestCaCertSerailId != -1) {
+          caCertId = Long.toString(latestCaCertSerailId);
+        }
       }
     }
   }
@@ -222,6 +238,18 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
   }
 
   /**
+   * Return the latest CA certificate known to the client.
+   * @return latest ca certificate known to the client.
+   */
+  @Override
+  public X509Certificate getCACertificate() {
+    if (caCertId != null) {
+      return certificateMap.get(caCertId);
+    }
+    return null;
+  }
+
+  /**
    * Returns the certificate  with the specified certificate serial id if it
    * exists else try to get it from SCM.
    * @param  certId
@@ -491,6 +519,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
 
       if(caCert) {
         certName = CA_CERT_PREFIX + certName;
+        caCertId = cert.getSerialNumber().toString();
       }
 
       certificateCodec.writeCertificate(basePath, certName,
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 82307a4..9e4c5ea 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1838,39 +1838,12 @@
     <description>HDDS GRPC server TLS provider.</description>
   </property>
   <property>
-    <name>hdds.client.cert.chain.file.name</name>
-    <value>client.crt</value>
-    <tag>OZONE, HDDS, SECURITY</tag>
-    <description>Client certificate file name. It is an optional
-      field only required when mutual TLS (hdds.grpc.mutual.tls.required)
-      is set to true .</description>
-  </property>
-  <property>
-    <name>hdds.grpc.mutual.tls.required</name>
-    <value>false</value>
-    <tag>OZONE, HDDS, SECURITY, TLS</tag>
-    <description>If mutual tls check is enabled for GRPC.
-    Considered only if hdds.grpc.tls.enabled is set to true.</description>
-  </property>
-  <property>
     <name>hdds.grpc.tls.enabled</name>
     <value>false</value>
     <tag>OZONE, HDDS, SECURITY, TLS</tag>
     <description>If HDDS GRPC server TLS is enabled.</description>
   </property>
   <property>
-    <name>hdds.server.cert.chain.file.name</name>
-    <value>server.crt</value>
-    <tag>OZONE, HDDS, SECURITY</tag>
-    <description>Hdds server certificate file name.</description>
-  </property>
-  <property>
-    <name>hdds.trust.cert.collection.file.name</name>
-    <value>ca.crt</value>
-    <tag>OZONE, HDDS, SECURITY</tag>
-    <description>HDDS Certificate Authority trust store file 
name.</description>
-  </property>
-  <property>
     <name>hdds.x509.default.duration</name>
     <value>P365D</value>
     <tag>OZONE, HDDS, SECURITY</tag>
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
index 2879001..9ea4adf 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.metrics2.lib.MutableRate;
 @InterfaceAudience.Private
 @Metrics(about="Storage Container DataNode Metrics", context="dfs")
 public class ContainerMetrics {
+  public static final String STORAGE_CONTAINER_METRICS =
+      "StorageContainerMetrics";
   @Metric private MutableCounterLong numOps;
   private MutableCounterLong[] numOpsArray;
   private MutableCounterLong[] opsBytesArray;
@@ -89,11 +91,16 @@ public class ContainerMetrics {
     // Percentile measurement is off by default, by watching no intervals
     int[] intervals =
              conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
-    return ms.register("StorageContainerMetrics",
+    return ms.register(STORAGE_CONTAINER_METRICS,
                        "Storage Container Node Metrics",
                        new ContainerMetrics(intervals));
   }
 
+  public static void remove() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(STORAGE_CONTAINER_METRICS);
+  }
+
   public void incContainerOpsMetrics(ContainerProtos.Type type) {
     numOps.incr();
     numOpsArray[type.ordinal()].incr();
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 23fa2d0..bb352ea 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -45,12 +45,10 @@ import org.apache.ratis.thirdparty.io.grpc.ServerBuilder;
 import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
-import org.apache.ratis.thirdparty.io.netty.handler.ssl.ClientAuth;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -112,21 +110,9 @@ public final class XceiverServerGrpc extends XceiverServer 
{
     }
 
     if (getSecConfig().isGrpcTlsEnabled()) {
-      File privateKeyFilePath =
-          getSecurityConfig().getServerPrivateKeyFile(COMPONENT);
-      File serverCertChainFilePath =
-          getSecurityConfig().getServerCertChainFile(COMPONENT);
-      File clientCertChainFilePath =
-          getSecurityConfig().getClientCertChainFile(COMPONENT);
       try {
         SslContextBuilder sslClientContextBuilder = 
SslContextBuilder.forServer(
-            serverCertChainFilePath, privateKeyFilePath);
-        if (getSecurityConfig().isGrpcMutualTlsRequired() &&
-            clientCertChainFilePath != null) {
-          // Only needed for mutual TLS
-          sslClientContextBuilder.clientAuth(ClientAuth.REQUIRE);
-          sslClientContextBuilder.trustManager(clientCertChainFilePath);
-        }
+            caClient.getPrivateKey(), caClient.getCertificate());
         SslContextBuilder sslContextBuilder = GrpcSslContexts.configure(
             sslClientContextBuilder, getSecurityConfig().getGrpcSslProvider());
         nettyServerBuilder.sslContext(sslContextBuilder.build());
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index e521fb4..746bfb8 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -398,8 +398,8 @@ public final class XceiverServerRatis extends XceiverServer 
{
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_RANDOM_PORT_DEFAULT)) {
       localPort = 0;
     }
-    GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfig(
-          new SecurityConfig(ozoneConf));
+    GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN(
+          new SecurityConfig(ozoneConf), caClient);
 
     return new XceiverServerRatis(datanodeDetails, localPort, dispatcher,
         containerController, context, tlsConfig, caClient, ozoneConf);
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 209a8e3..d52cf8c 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -236,6 +236,7 @@ public class OzoneContainer {
     hddsDispatcher.shutdown();
     volumeSet.shutdown();
     blockDeletingService.shutdown();
+    ContainerMetrics.remove();
   }
 
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ProfileServlet.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ProfileServlet.java
index 42944e1..016445c 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ProfileServlet.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/ProfileServlet.java
@@ -348,7 +348,6 @@ public class ProfileServlet extends HttpServlet {
       final HttpServletResponse resp)
       throws IOException {
 
-    ;
     String safeFileName = validateFileName(fileName);
     File requestedFile =
         ProfileServlet.OUTPUT_DIR
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/TestProfileServlet.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/TestProfileServlet.java
index c77fee0..1c4adf6 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/TestProfileServlet.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/TestProfileServlet.java
@@ -17,20 +17,11 @@
  */
 package org.apache.hadoop.hdds.server;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
-import java.io.OutputStreamWriter;
 
 import org.apache.hadoop.hdds.server.ProfileServlet.Event;
 import org.apache.hadoop.hdds.server.ProfileServlet.Output;
-import org.apache.hadoop.metrics2.MetricsSystem;
-import org.apache.hadoop.metrics2.annotation.Metric;
-import org.apache.hadoop.metrics2.annotation.Metrics;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.lib.MutableCounterLong;
-
-import static java.nio.charset.StandardCharsets.UTF_8;
-import org.junit.Assert;
+
 import org.junit.Test;
 
 /**
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index cec688c..77e037a 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -38,12 +39,12 @@ public final class PipelineFactory {
   private Map<ReplicationType, PipelineProvider> providers;
 
   PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
-      Configuration conf) {
+      Configuration conf, GrpcTlsConfig tlsConfig) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager, conf));
+        new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
   }
 
   @VisibleForTesting
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index bd8fa2d..9ba5f31 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -94,4 +95,5 @@ public interface PipelineManager extends Closeable, 
PipelineManagerMXBean {
    */
   void deactivatePipeline(PipelineID pipelineID) throws IOException;
 
+  GrpcTlsConfig getGrpcTlsConfig();
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 1bba45d..2b11da9 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -97,7 +97,8 @@ public class PipelineReportHandler implements
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
-      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf);
+      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
+          pipelineManager.getGrpcTlsConfig());
       return;
     }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9e22733..a5e3d37 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -29,7 +29,6 @@ import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
 import 
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
@@ -84,13 +83,15 @@ public class RatisPipelineProvider implements 
PipelineProvider {
 
   private final ForkJoinPool forkJoinPool = new ForkJoinPool(
       parallelismForPool, factory, null, false);
-
+  private final GrpcTlsConfig tlsConfig;
 
   RatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager, Configuration conf) {
+      PipelineStateManager stateManager, Configuration conf,
+      GrpcTlsConfig tlsConfig) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
+    this.tlsConfig = tlsConfig;
   }
 
 
@@ -217,8 +218,6 @@ public class RatisPipelineProvider implements 
PipelineProvider {
         Collections.synchronizedList(new ArrayList<>());
     final int maxOutstandingRequests =
         HddsClientUtils.getMaxOutstandingRequests(conf);
-    final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
-        SecurityConfig(conf));
     final TimeDuration requestTimeout =
         RatisHelper.getClientRequestTimeout(conf);
     try {
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index d9aec34..777a0b0 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -23,7 +23,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.grpc.GrpcTlsConfig;
@@ -54,14 +53,16 @@ public final class RatisPipelineUtils {
    *
    * @param pipeline        - Pipeline to be destroyed
    * @param ozoneConf       - Ozone configuration
+   * @param grpcTlsConfig
    * @throws IOException
    */
-  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf) {
+  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
+      GrpcTlsConfig grpcTlsConfig) {
     final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
     LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
     for (DatanodeDetails dn : pipeline.getNodes()) {
       try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf);
+        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
       } catch (IOException e) {
         LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
             pipeline.getId(), dn);
@@ -75,10 +76,11 @@ public final class RatisPipelineUtils {
    * @param dn         - Datanode on which pipeline needs to be destroyed
    * @param pipelineID - ID of pipeline to be destroyed
    * @param ozoneConf  - Ozone configuration
+   * @param grpcTlsConfig - grpc tls configuration
    * @throws IOException
    */
   static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      Configuration ozoneConf) throws IOException {
+      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException 
{
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
@@ -86,13 +88,12 @@ public final class RatisPipelineUtils {
     final RaftPeer p = RatisHelper.toRaftPeer(dn);
     final int maxOutstandingRequests =
         HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
-    final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(
-        new SecurityConfig(ozoneConf));
     final TimeDuration requestTimeout =
         RatisHelper.getClientRequestTimeout(ozoneConf);
     try(RaftClient client = RatisHelper
         .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
+            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
+            requestTimeout)) {
       client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
           true, p.getId());
     }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 20b9350..0964f6d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -82,14 +83,16 @@ public class SCMPipelineManager implements PipelineManager {
   private final Configuration conf;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
+  private GrpcTlsConfig grpcTlsConfig;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
-      EventPublisher eventPublisher) throws IOException {
+      EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+      throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
-        conf);
+        conf, grpcTlsConfig);
     // TODO: See if thread priority needs to be set for these threads
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
@@ -111,6 +114,7 @@ public class SCMPipelineManager implements PipelineManager {
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
     initializePipelineState();
+    this.grpcTlsConfig = grpcTlsConfig;
   }
 
   public PipelineStateManager getStateManager() {
@@ -404,7 +408,7 @@ public class SCMPipelineManager implements PipelineManager {
    * @throws IOException
    */
   private void destroyPipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.destroyPipeline(pipeline, conf);
+    RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
     // remove the pipeline from the pipeline manager
     removePipeline(pipeline.getId());
     triggerPipelineCreation();
@@ -437,6 +441,11 @@ public class SCMPipelineManager implements PipelineManager 
{
   }
 
   @Override
+  public GrpcTlsConfig getGrpcTlsConfig() {
+    return grpcTlsConfig;
+  }
+
+  @Override
   public void close() throws IOException {
     if (scheduler != null) {
       scheduler.close();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 3502c85..4ecab37 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.block.BlockManager;
@@ -100,6 +101,7 @@ import 
org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import 
org.apache.hadoop.security.authentication.client.AuthenticationException;
 import org.apache.hadoop.util.JvmPauseMonitor;
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
+import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -186,6 +188,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
 
   private SCMSafeModeManager scmSafeModeManager;
   private CertificateServer certificateServer;
+  private GrpcTlsConfig grpcTlsConfig;
 
   private JvmPauseMonitor jvmPauseMonitor;
   private final OzoneConfiguration configuration;
@@ -399,7 +402,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue,
+              grpcTlsConfig);
     }
 
     if (configurator.getContainerManager() != null) {
@@ -443,8 +447,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
    * @throws AuthenticationException - on Failure
    */
   private void initializeCAnSecurityProtocol(OzoneConfiguration conf,
-                                             SCMConfigurator configurator)
-      throws IOException {
+      SCMConfigurator configurator) throws IOException {
     if(configurator.getCertificateServer() != null) {
       this.certificateServer = configurator.getCertificateServer();
     } else {
@@ -458,6 +461,10 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
         CertificateServer.CAType.SELF_SIGNED_CA);
     securityProtocolServer = new SCMSecurityProtocolServer(conf,
         certificateServer);
+
+    grpcTlsConfig = RatisHelper
+        .createTlsClientConfigForSCM(new SecurityConfig(conf),
+            certificateServer);
   }
 
   /**
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index f7a5df7..a8364a4 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -67,7 +67,7 @@ public class TestCloseContainerEventHandler {
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(configuration, nodeManager, eventQueue);
+        new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index bfdeac5..75a1ad3 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -94,7 +94,7 @@ public class TestSCMContainerManager {
     }
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
     containerManager = new SCMContainerManager(conf, nodeManager,
         pipelineManager, new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index ec0c4c3..26ffd8d 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -65,8 +65,6 @@ import static org.junit.Assert.assertEquals;
 public class TestContainerPlacement {
   @Rule
   public ExpectedException thrown = ExpectedException.none();
-  private static XceiverClientManager xceiverClientManager =
-      new XceiverClientManager(new OzoneConfiguration());
 
   /**
    * Returns a new copy of Configuration.
@@ -109,7 +107,7 @@ public class TestContainerPlacement {
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     PipelineManager pipelineManager =
-        new SCMPipelineManager(config, scmNodeManager, eventQueue);
+        new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
     return new SCMContainerManager(config, scmNodeManager, pipelineManager,
         eventQueue);
 
@@ -144,6 +142,7 @@ public class TestContainerPlacement {
         createContainerManager(conf, nodeManager);
     List<DatanodeDetails> datanodes =
         TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount);
+    XceiverClientManager xceiverClientManager = null;
     try {
       for (DatanodeDetails datanodeDetails : datanodes) {
         nodeManager.processHeartbeat(datanodeDetails);
@@ -159,6 +158,8 @@ public class TestContainerPlacement {
       assertEquals(remaining * nodeCount,
           (long) nodeManager.getStats().getRemaining().get());
 
+      xceiverClientManager= new XceiverClientManager(new OzoneConfiguration());
+
       ContainerInfo container = containerManager
           .allocateContainer(
           xceiverClientManager.getType(),
@@ -169,6 +170,9 @@ public class TestContainerPlacement {
     } finally {
       IOUtils.closeQuietly(containerManager);
       IOUtils.closeQuietly(nodeManager);
+      if (xceiverClientManager != null) {
+        xceiverClientManager.close();
+      }
       FileUtil.fullyDelete(testDir);
     }
   }
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 32784a3..01c53ba 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -31,7 +31,7 @@ public class MockRatisPipelineProvider extends 
RatisPipelineProvider {
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
-    super(nodeManager, stateManager, conf);
+    super(nodeManager, stateManager, conf, null);
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index eb1f88b..94c3039 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -71,7 +71,7 @@ public class TestHealthyPipelineSafeModeRule {
 
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue);
+          nodeManager, eventQueue, null);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -116,7 +116,7 @@ public class TestHealthyPipelineSafeModeRule {
 
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue);
+          nodeManager, eventQueue, null);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -191,7 +191,7 @@ public class TestHealthyPipelineSafeModeRule {
 
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue);
+          nodeManager, eventQueue, null);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 99677d6..ca54d05 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -71,7 +71,7 @@ public class TestOneReplicaPipelineSafeModeRule {
     eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
-            eventQueue);
+            eventQueue, null);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 7ddf84e..ba92035 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -197,7 +197,7 @@ public class TestSCMSafeModeManager {
           0.9);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue);
+          mockNodeManager, queue, null);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -215,7 +215,7 @@ public class TestSCMSafeModeManager {
           200);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue);
+          mockNodeManager, queue, null);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -232,7 +232,7 @@ public class TestSCMSafeModeManager {
       conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue);
+          mockNodeManager, queue, null);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForSafeModePercent");
@@ -256,7 +256,7 @@ public class TestSCMSafeModeManager {
 
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
     SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
-        mockNodeManager, queue);
+        mockNodeManager, queue, null);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config);
@@ -477,7 +477,7 @@ public class TestSCMSafeModeManager {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, queue);
+          nodeManager, queue, null);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneMultipartUploadList.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneMultipartUploadList.java
index 971d866..38377eb 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneMultipartUploadList.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/OzoneMultipartUploadList.java
@@ -18,7 +18,6 @@
 
 package org.apache.hadoop.ozone.client;
 
-import java.util.ArrayList;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index bd01aaf..202e0eb 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -32,14 +32,11 @@ import 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ChecksumType;
 import org.apache.hadoop.hdds.scm.ByteStringHelper;
 import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.OzoneSecurityUtil;
 import org.apache.hadoop.ozone.client.*;
 import org.apache.hadoop.hdds.client.OzoneQuota;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
@@ -68,21 +65,14 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.OzoneAclUtil;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
-import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import org.apache.hadoop.ozone.om.protocolPB
     .OzoneManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
-import org.apache.hadoop.ozone.protocol.proto
-    .OzoneManagerProtocolProtos.ServicePort;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.XceiverClientManager;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.protocolPB
-    .StorageContainerLocationProtocolPB;
 import org.apache.hadoop.ozone.security.GDPRSymmetricKey;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLIdentityType;
@@ -102,7 +92,6 @@ import javax.crypto.Cipher;
 import javax.crypto.CipherInputStream;
 import javax.crypto.CipherOutputStream;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.InvalidKeyException;
 import java.util.*;
@@ -122,8 +111,6 @@ public class RpcClient implements ClientProtocol {
       LoggerFactory.getLogger(RpcClient.class);
 
   private final OzoneConfiguration conf;
-  private final StorageContainerLocationProtocol
-      storageContainerLocationClient;
   private final OzoneManagerProtocol ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
@@ -143,7 +130,7 @@ public class RpcClient implements ClientProtocol {
   private Text dtService;
   private final boolean topologyAwareReadEnabled;
 
-   /**
+  /**
     * Creates RpcClient instance with the given configuration.
     * @param conf Configuration
     * @param omServiceId OM HA Service ID, set this to null if not HA
@@ -163,21 +150,16 @@ public class RpcClient implements ClientProtocol {
             this.conf, clientId.toString(), omServiceId, ugi),
         OzoneManagerProtocol.class, conf
     );
-    long scmVersion =
-        RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
-    InetSocketAddress scmAddress = getScmAddressForClient();
-    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
-        ProtobufRpcEngine.class);
-
-    StorageContainerLocationProtocolClientSideTranslatorPB client =
-        new StorageContainerLocationProtocolClientSideTranslatorPB(
-            RPC.getProxy(StorageContainerLocationProtocolPB.class, scmVersion,
-                scmAddress, ugi, conf, NetUtils.getDefaultSocketFactory(conf),
-                Client.getRpcTimeout(conf)));
-    this.storageContainerLocationClient =
-        TracingUtil.createProxy(client, StorageContainerLocationProtocol.class,
-            conf);
-    this.xceiverClientManager = new XceiverClientManager(conf);
+
+    ServiceInfoEx serviceInfoEx = ozoneManagerClient.getServiceInfo();
+    String caCertPem = null;
+    if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+      caCertPem = serviceInfoEx.getCaCertificate();
+    }
+
+    this.xceiverClientManager = new XceiverClientManager(conf,
+        OzoneConfiguration.of(conf).getObject(XceiverClientManager.
+            ScmClientConfig.class), caCertPem);
 
     int configuredChunkSize = (int) conf
         .getStorageSize(ScmConfigKeys.OZONE_SCM_CHUNK_SIZE_KEY,
@@ -245,15 +227,6 @@ public class RpcClient implements ClientProtocol {
         OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_DEFAULT);
   }
 
-  private InetSocketAddress getScmAddressForClient() throws IOException {
-    List<ServiceInfo> services = ozoneManagerClient.getServiceList();
-    ServiceInfo scmInfo = services.stream().filter(
-        a -> a.getNodeType().equals(HddsProtos.NodeType.SCM))
-        .collect(Collectors.toList()).get(0);
-    return NetUtils.createSocketAddr(
-        scmInfo.getServiceAddress(ServicePort.Type.RPC));
-  }
-
   @Override
   public void createVolume(String volumeName) throws IOException {
     createVolume(volumeName, VolumeArgs.newBuilder().build());
@@ -806,7 +779,6 @@ public class RpcClient implements ClientProtocol {
 
   @Override
   public void close() throws IOException {
-    IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
     IOUtils.cleanupWithLogger(LOG, ozoneManagerClient);
     IOUtils.cleanupWithLogger(LOG, xceiverClientManager);
   }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index b2760b3..cc908fc 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.common.BlockGroup;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
index a3bacec..80123fd 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartKeyInfo.java
@@ -16,14 +16,11 @@
  */
 package org.apache.hadoop.ozone.om.helpers;
 
-import org.apache.hadoop.hdds.client.ReplicationFactor;
-import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .MultipartKeyInfo;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .PartKeyInfo;
 
-import java.time.Instant;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.TreeMap;
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
index 634f7ce9..0c13a0d 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
@@ -20,9 +20,6 @@ package org.apache.hadoop.ozone.om.helpers;
 
 import java.util.List;
 
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-
 /**
  * List of in-flight MPU uploads.
  */
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadListParts.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadListParts.java
index 2921b7b..ba0cd42 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadListParts.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadListParts.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.om.helpers;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .PartInfo;
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
similarity index 63%
copy from 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
copy to 
hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
index 634f7ce9..a90be63 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/OmMultipartUploadList.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/helpers/ServiceInfoEx.java
@@ -20,28 +20,28 @@ package org.apache.hadoop.ozone.om.helpers;
 
 import java.util.List;
 
-import org.apache.hadoop.hdds.client.ReplicationType;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-
 /**
- * List of in-flight MPU uploads.
+ * Wrapper class for service discovery, design for broader usage such as
+ * security, etc.
  */
-public class OmMultipartUploadList {
+public class ServiceInfoEx {
 
-  private List<OmMultipartUpload> uploads;
+  private List<ServiceInfo> infoList;
 
-  public OmMultipartUploadList(
-      List<OmMultipartUpload> uploads) {
-    this.uploads = uploads;
-  }
+  // PEM encoded string of SCM CA certificate.
+  private String caCertificate;
 
-  public List<OmMultipartUpload> getUploads() {
-    return uploads;
+  public ServiceInfoEx(List<ServiceInfo> infoList,
+      String caCertificate) {
+    this.infoList = infoList;
+    this.caCertificate = caCertificate;
   }
 
-  public void setUploads(
-      List<OmMultipartUpload> uploads) {
-    this.uploads = uploads;
+  public List<ServiceInfo> getServiceInfoList() {
+    return infoList;
   }
 
+  public String getCaCertificate() {
+    return caCertificate;
+  }
 }
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 7dce8e5..a236695 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -22,8 +22,6 @@ import 
org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.OzoneAcl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
-import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
@@ -31,15 +29,18 @@ import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
+import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
 
@@ -288,6 +289,8 @@ public interface OzoneManagerProtocol
    */
   List<ServiceInfo> getServiceList() throws IOException;
 
+  ServiceInfoEx getServiceInfo() throws IOException;
+
   /*
    * S3 Specific functionality that is supported by Ozone Manager.
    */
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index 5cd2709..c9dc8ec 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -55,9 +55,10 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
+import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
-import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.AddAclResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.GetAclRequest;
@@ -1211,6 +1212,24 @@ public final class 
OzoneManagerProtocolClientSideTranslatorPB
 
   }
 
+  @Override
+  public ServiceInfoEx getServiceInfo() throws IOException {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    OMRequest omRequest = createOMRequest(Type.ServiceList)
+        .setServiceListRequest(req)
+        .build();
+
+    final ServiceListResponse resp = handleError(submitRequest(omRequest))
+        .getServiceListResponse();
+
+    return new ServiceInfoEx(
+        resp.getServiceInfoList().stream()
+            .map(ServiceInfo::getFromProtobuf)
+            .collect(Collectors.toList()),
+        resp.getCaCertificate());
+  }
+
   /**
    * Get a valid Delegation Token.
    *
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto 
b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index 3baad5a..0fd02ce 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -878,6 +878,9 @@ message DBUpdatesRequest {
 message ServiceListResponse {
 
     repeated ServiceInfo serviceInfo = 2;
+    // When security is enabled, return SCM CA certificate to Ozone client
+    // to set up gRPC TLS for client to authenticate server(DN).
+    optional string caCertificate = 3;
 }
 
 message DBUpdatesResponse {
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index eebaa7d..2a486b1 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -76,7 +76,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testPipelineReload() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -93,7 +93,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -116,7 +116,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testRemovePipeline() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -134,8 +134,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should not be able to load removed pipelines
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager,
-            new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
     try {
       pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
@@ -151,7 +150,7 @@ public class TestSCMPipelineManager {
   public void testPipelineReport() throws IOException {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue);
+        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -218,7 +217,7 @@ public class TestSCMPipelineManager {
     MockNodeManager nodeManagerMock = new MockNodeManager(true,
         20);
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
+        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
             pipelineManager.getStateManager(), conf);
@@ -273,7 +272,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
     final PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index b8de587..d498200 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -83,6 +83,8 @@ import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.hdds.utils.HddsVersionInfo;
 import org.junit.Assert;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
@@ -90,8 +92,6 @@ import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 import org.mockito.ArgumentMatcher;
 import org.mockito.Mockito;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -100,12 +100,7 @@ import com.google.common.collect.Maps;
  * Test class that exercises the StorageContainerManager.
  */
 public class TestStorageContainerManager {
-  private static XceiverClientManager xceiverClientManager =
-      new XceiverClientManager(
-      new OzoneConfiguration());
-  private static final Logger LOG = LoggerFactory.getLogger(
-      TestStorageContainerManager.class);
-
+  private static XceiverClientManager xceiverClientManager;
   /**
    * Set the timeout for every test.
    */
@@ -121,6 +116,18 @@ public class TestStorageContainerManager {
   @Rule
   public TemporaryFolder folder= new TemporaryFolder();
 
+  @BeforeClass
+  public static void setup() throws IOException {
+    xceiverClientManager = new XceiverClientManager(new OzoneConfiguration());
+  }
+
+  @AfterClass
+  public static void cleanup() {
+    if (xceiverClientManager != null) {
+      xceiverClientManager.close();
+    }
+  }
+
   @Test
   public void testRpcPermission() throws Exception {
     // Test with default configuration
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index 25bde38..d05093f 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -61,7 +61,7 @@ public class CertificateClientTestImpl implements 
CertificateClient {
             .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
             .setClusterID("cluster1")
             .setKey(keyPair)
-            .setSubject("TestCertSub")
+            .setSubject("localhost")
             .setConfiguration(config)
             .setScmID("TestScmId1")
             .makeCA();
@@ -99,6 +99,11 @@ public class CertificateClientTestImpl implements 
CertificateClient {
   }
 
   @Override
+  public X509Certificate getCACertificate() {
+    return x509Certificate;
+  }
+
+  @Override
   public boolean verifyCertificate(X509Certificate certificate) {
     return true;
   }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index e5a3d2f..0886d26 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -155,9 +155,13 @@ public class TestContainerReplicationEndToEnd {
             .getPipeline(pipelineID);
     key.close();
 
-    cluster.getStorageContainerManager().getContainerManager()
-        .updateContainerState(new ContainerID(containerID),
-            HddsProtos.LifeCycleEvent.FINALIZE);
+    if (cluster.getStorageContainerManager().getContainerManager()
+        .getContainer(new ContainerID(containerID)).getState() !=
+        HddsProtos.LifeCycleState.CLOSING) {
+      cluster.getStorageContainerManager().getContainerManager()
+          .updateContainerState(new ContainerID(containerID),
+              HddsProtos.LifeCycleEvent.FINALIZE);
+    }
     // wait for container to move to OPEN state in SCM
     Thread.sleep(2 * containerReportInterval);
     DatanodeDetails oldReplicaNode = pipeline.getFirstNode();
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index c2937a8..30a2593 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -18,14 +18,17 @@
 
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
-import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
@@ -33,13 +36,10 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.util.ThreadUtil;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Rule;
-import org.junit.Test;
+import org.junit.*;
 import org.junit.rules.TemporaryFolder;
 import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
@@ -49,13 +49,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.File;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.EnumSet;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_KEY_DIR_NAME_DEFAULT;
@@ -81,34 +79,24 @@ public class TestOzoneContainerWithTLS {
   public TemporaryFolder tempFolder = new TemporaryFolder();
 
   private OzoneConfiguration conf;
-  private SecurityConfig secConfig;
-  private Boolean requireMutualTls;
-
-  public TestOzoneContainerWithTLS(Boolean requireMutualTls) {
-    this.requireMutualTls = requireMutualTls;
+  private OzoneBlockTokenSecretManager secretManager;
+  private CertificateClientTestImpl caClient;
+  private boolean blockTokenEnabled;
 
+  public TestOzoneContainerWithTLS(boolean blockTokenEnabled) {
+    this.blockTokenEnabled = blockTokenEnabled;
   }
 
   @Parameterized.Parameters
-  public static Collection<Object[]> encryptionOptions() {
+  public static Collection<Object[]> enableBlockToken() {
     return Arrays.asList(new Object[][] {
-        {true},
-        {false}
+        {false},
+        {true}
     });
   }
 
-  private void copyResource(String inputResourceName, File outputFile) throws
-      IOException {
-    InputStream is = ThreadUtil.getResourceAsStream(inputResourceName);
-    try (OutputStream os = new FileOutputStream(outputFile)) {
-      IOUtils.copy(is, os);
-    } finally {
-      IOUtils.closeQuietly(is);
-    }
-  }
-
   @Before
-  public void setup() throws IOException{
+  public void setup() throws Exception {
     conf = new OzoneConfiguration();
     String ozoneMetaPath =
         GenericTestUtils.getTempPath("ozoneMeta");
@@ -125,21 +113,24 @@ public class TestOzoneContainerWithTLS {
     conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_ENABLED, true);
 
     conf.setBoolean(HddsConfigKeys.HDDS_GRPC_TLS_TEST_CERT, true);
-    secConfig = new SecurityConfig(conf);
 
-    copyResource("ssl/ca.crt", secConfig.getTrustStoreFile());
-    copyResource("ssl/server.pem", secConfig.getServerPrivateKeyFile());
-    copyResource("ssl/client.pem", secConfig.getClientPrivateKeyFile());
-    copyResource("ssl/client.crt", secConfig.getClientCertChainFile());
-    copyResource("ssl/server.crt", secConfig.getServerCertChainFile());
+    long expiryTime = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME,
+        HddsConfigKeys.HDDS_BLOCK_TOKEN_EXPIRY_TIME_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    caClient = new CertificateClientTestImpl(conf);
+    secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
+        expiryTime, caClient.getCertificate().
+        getSerialNumber().toString());
   }
 
   @Test
   public void testCreateOzoneContainer() throws Exception {
-    LOG.info("testCreateOzoneContainer with Mutual TLS: {}",
-        requireMutualTls);
-    conf.setBoolean(HddsConfigKeys.HDDS_GRPC_MUTUAL_TLS_REQUIRED,
-        requireMutualTls);
+    LOG.info("testCreateOzoneContainer with TLS and blockToken enabled: {}",
+        blockTokenEnabled);
+    conf.setBoolean(HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED,
+        blockTokenEnabled);
 
     long containerID = ContainerTestHelper.getTestContainerID();
     OzoneContainer container = null;
@@ -154,13 +145,25 @@ public class TestOzoneContainerWithTLS {
       conf.setBoolean(
           OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
 
-      container = new OzoneContainer(dn, conf, getContext(dn), null);
+      container = new OzoneContainer(dn, conf, getContext(dn), caClient);
       //Set scmId and manually start ozone container.
       container.start(UUID.randomUUID().toString());
 
-      XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
-      client.connect();
-      createContainerForTesting(client, containerID);
+      XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf,
+          caClient.getCACertificate());
+
+      if (blockTokenEnabled) {
+        secretManager.start(caClient);
+        Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken(
+            "123", EnumSet.allOf(
+                HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+            RandomUtils.nextLong());
+        client.connect(token.encodeToUrlString());
+        createSecureContainerForTesting(client, containerID, token);
+      } else {
+        createContainerForTesting(client, containerID);
+        client.connect();
+      }
     } finally {
       if (container != null) {
         container.stop();
@@ -170,7 +173,6 @@ public class TestOzoneContainerWithTLS {
 
   public static void createContainerForTesting(XceiverClientSpi client,
       long containerID) throws Exception {
-    // Create container
     ContainerProtos.ContainerCommandRequestProto request =
         ContainerTestHelper.getCreateContainerRequest(
             containerID, client.getPipeline());
@@ -179,6 +181,18 @@ public class TestOzoneContainerWithTLS {
     Assert.assertNotNull(response);
   }
 
+  public static void createSecureContainerForTesting(XceiverClientSpi client,
+      long containerID, Token<OzoneBlockTokenIdentifier> token)
+      throws Exception {
+    ContainerProtos.ContainerCommandRequestProto request =
+        ContainerTestHelper.getCreateContainerSecureRequest(
+            containerID, client.getPipeline(), token);
+    ContainerProtos.ContainerCommandResponseProto response =
+        client.sendCommand(request);
+    Assert.assertNotNull(response);
+  }
+
+
   private StateContext getContext(DatanodeDetails datanodeDetails) {
     DatanodeStateMachine stateMachine = Mockito.mock(
         DatanodeStateMachine.class);
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
index a92cd3a..4c25b0c 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientManager.java
@@ -115,7 +115,7 @@ public class TestXceiverClientManager {
         TestXceiverClientManager.class.getName() + UUID.randomUUID());
     conf.set(HDDS_METADATA_DIR_NAME, metaDir);
     XceiverClientManager clientManager =
-        new XceiverClientManager(conf, clientConfig);
+        new XceiverClientManager(conf, clientConfig, null);
     Cache<String, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
@@ -173,7 +173,7 @@ public class TestXceiverClientManager {
         TestXceiverClientManager.class.getName() + UUID.randomUUID());
     conf.set(HDDS_METADATA_DIR_NAME, metaDir);
     XceiverClientManager clientManager =
-        new XceiverClientManager(conf, clientConfig);
+        new XceiverClientManager(conf, clientConfig, null);
     Cache<String, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
@@ -222,7 +222,7 @@ public class TestXceiverClientManager {
     ScmClientConfig clientConfig = conf.getObject(ScmClientConfig.class);
     clientConfig.setMaxSize(1);
     XceiverClientManager clientManager =
-        new XceiverClientManager(conf, clientConfig);
+        new XceiverClientManager(conf, clientConfig, null);
     Cache<String, XceiverClientSpi> cache =
         clientManager.getClientCache();
 
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
index 66a440d..de42be0 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMMetrics.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.om;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.sun.codemodel.internal.JExpression;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.metrics2.MetricsSystem;
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index f7297b7..d2d7256 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -117,6 +117,7 @@ import 
org.apache.hadoop.ozone.om.helpers.OmMultipartUploadListParts;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
+import org.apache.hadoop.ozone.om.helpers.ServiceInfoEx;
 import org.apache.hadoop.ozone.om.helpers.OzoneFileStatus;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
@@ -240,6 +241,7 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   private OzoneDelegationTokenSecretManager delegationTokenMgr;
   private OzoneBlockTokenSecretManager blockTokenMgr;
   private CertificateClient certClient;
+  private String caCertPem = null;
   private static boolean testSecureOmFlag = false;
   private final Text omRpcAddressTxt;
   private final OzoneConfiguration configuration;
@@ -1254,6 +1256,10 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
     metadataManager.start(configuration);
     startSecretManagerIfNecessary();
 
+    if (certClient != null) {
+      caCertPem = CertificateCodec.getPEMEncodedString(
+          certClient.getCACertificate());
+    }
     // Set metrics and start metrics back ground thread
     metrics.setNumVolumes(metadataManager.countRowsInTable(metadataManager
         .getVolumeTable()));
@@ -2592,6 +2598,11 @@ public final class OzoneManager extends 
ServiceRuntimeInfoImpl
   }
 
   @Override
+  public ServiceInfoEx getServiceInfo() throws IOException {
+    return new ServiceInfoEx(getServiceList(), caCertPem);
+  }
+
+  @Override
   /**
    * {@inheritDoc}
    */
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
index 3da17a9..01e59b4 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerRequestHandler.java
@@ -761,10 +761,12 @@ public class OzoneManagerRequestHandler implements 
RequestHandler {
       throws IOException {
     ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
 
-    resp.addAllServiceInfo(impl.getServiceList().stream()
+    resp.addAllServiceInfo(impl.getServiceInfo().getServiceInfoList().stream()
         .map(ServiceInfo::getProtobuf)
         .collect(Collectors.toList()));
-
+    if (impl.getServiceInfo().getCaCertificate() != null) {
+      resp.setCaCertificate(impl.getServiceInfo().getCaCertificate());
+    }
     return resp.build();
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to