This is an automated email from the ASF dual-hosted git repository.

pifta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 73e4e5b3bc HDDS-9138. Use sequence ID for certificate serial ID (#5163)
73e4e5b3bc is described below

commit 73e4e5b3bc9a9c45bafb63ff9aae6209c3120624
Author: Sammi Chen <[email protected]>
AuthorDate: Fri Oct 20 18:21:01 2023 +0800

    HDDS-9138. Use sequence ID for certificate serial ID (#5163)
---
 .../certificate/utils/CertificateSignRequest.java  | 17 +++++-
 .../certificate/utils/SelfSignedCertificate.java   | 12 ++---
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |  4 +-
 .../certificate/authority/CertificateApprover.java |  4 +-
 .../certificate/authority/CertificateServer.java   | 21 ++------
 .../certificate/authority/DefaultApprover.java     | 25 ++++-----
 .../certificate/authority/DefaultCAServer.java     | 29 +++++------
 .../client/DefaultCertificateClient.java           |  3 +-
 .../certificate/client/SCMCertificateClient.java   |  6 +--
 .../x509/certificate/authority/MockApprover.java   |  2 +-
 .../certificate/authority/TestDefaultCAServer.java | 31 +++++------
 .../client/CertificateClientTestImpl.java          |  6 ++-
 .../certificate/utils/TestRootCertificate.java     |  2 +-
 .../apache/hadoop/hdds/scm/ha/HASecurityUtils.java | 28 +++++-----
 .../hadoop/hdds/scm/ha/SequenceIdGenerator.java    | 60 +++++++++++++++-------
 .../scm/security/RootCARotationHandlerImpl.java    | 26 +---------
 .../hdds/scm/security/RootCARotationManager.java   |  8 +--
 .../hdds/scm/server/SCMSecurityProtocolServer.java | 28 +++++++---
 .../hdds/scm/server/StorageContainerManager.java   |  4 +-
 .../disabled-test-root-ca-rotation.sh              | 22 ++++----
 .../main/compose/ozonesecure/docker-compose.yaml   |  2 +
 .../main/compose/ozonesecure/root-ca-rotation.yaml |  2 +-
 .../compose/ozonesecure/test-root-ca-rotation.sh   | 15 ++++--
 hadoop-ozone/dist/src/main/compose/testlib.sh      | 23 +++++++++
 .../dist/src/main/smoketest/commonlib.robot        |  2 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |  3 +-
 26 files changed, 209 insertions(+), 176 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateSignRequest.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateSignRequest.java
index 547c51019e..c1cc671215 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateSignRequest.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateSignRequest.java
@@ -73,8 +73,11 @@ import static 
org.apache.hadoop.hdds.security.x509.exception.CertificateExceptio
  * PKCS10CertificationRequest to CertificateServer.
  */
 public final class CertificateSignRequest {
-  // Ozone Certificate distinguished format: (CN=Subject,OU=ScmID,O=ClusterID).
+  // Ozone final certificate distinguished format:
+  // (CN=Subject,OU=ScmID,O=ClusterID,SERIALNUMBER=SerialID).
   private static final String DISTINGUISHED_NAME_FORMAT = "CN=%s,OU=%s,O=%s";
+  private static final String DISTINGUISHED_NAME_WITH_SN_FORMAT =
+      "CN=%s,OU=%s,O=%s,SERIALNUMBER=%s";
   private static final Logger LOG =
       LoggerFactory.getLogger(CertificateSignRequest.class);
   private final KeyPair keyPair;
@@ -109,6 +112,18 @@ public final class CertificateSignRequest {
     return DISTINGUISHED_NAME_FORMAT;
   }
 
+  public static String getDistinguishedNameFormatWithSN() {
+    return DISTINGUISHED_NAME_WITH_SN_FORMAT;
+  }
+
+  // used by server side DN regeneration
+  public static X500Name getDistinguishedNameWithSN(String subject,
+      String scmID, String clusterID, String serialID) {
+    return new X500Name(String.format(DISTINGUISHED_NAME_WITH_SN_FORMAT,
+        subject, scmID, clusterID, serialID));
+  }
+
+  // used by client side DN generation
   public static X500Name getDistinguishedName(String subject, String scmID,
       String clusterID) {
     return new X500Name(String.format(getDistinguishedNameFormat(), subject,
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/SelfSignedCertificate.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/SelfSignedCertificate.java
index c44e499d4b..802c3ff07e 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/SelfSignedCertificate.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/SelfSignedCertificate.java
@@ -70,7 +70,6 @@ import static 
org.apache.hadoop.hdds.security.x509.exception.CertificateExceptio
  * provided.
  */
 public final class SelfSignedCertificate {
-  private static final String NAME_FORMAT = "CN=%s,OU=%s,O=%s";
   private String subject;
   private String clusterID;
   private String scmID;
@@ -101,7 +100,7 @@ public final class SelfSignedCertificate {
 
   @VisibleForTesting
   public static String getNameFormat() {
-    return NAME_FORMAT;
+    return CertificateSignRequest.getDistinguishedNameFormatWithSN();
   }
 
   public static Builder newBuilder() {
@@ -110,10 +109,6 @@ public final class SelfSignedCertificate {
 
   private X509CertificateHolder generateCertificate(BigInteger caCertSerialId)
       throws OperatorCreationException, IOException {
-    // For the Root Certificate we form the name from Subject, SCM ID and
-    // Cluster ID.
-    String dnName = String.format(getNameFormat(), subject, scmID, clusterID);
-    X500Name name = new X500Name(dnName);
     byte[] encoded = key.getPublic().getEncoded();
     SubjectPublicKeyInfo publicKeyInfo =
         SubjectPublicKeyInfo.getInstance(encoded);
@@ -129,6 +124,11 @@ public final class SelfSignedCertificate {
     } else {
       serial = caCertSerialId;
     }
+    // For the Root Certificate we form the name from Subject, SCM ID and
+    // Cluster ID.
+    String dnName = String.format(getNameFormat(),
+        subject, scmID, clusterID, serial);
+    X500Name name = new X500Name(dnName);
 
     // Valid from the Start of the day when we generate this Certificate.
     Date validFrom =
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index e802de0666..47610e29a3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -486,9 +486,9 @@ public final class OzoneConsts {
 
   // %s to distinguish different certificates
   public static final String SCM_SUB_CA = "scm-sub";
-  public static final String SCM_SUB_CA_PREFIX = SCM_SUB_CA + "-%s@";
+  public static final String SCM_SUB_CA_PREFIX = SCM_SUB_CA + "@";
   public static final String SCM_ROOT_CA = "scm";
-  public static final String SCM_ROOT_CA_PREFIX = SCM_ROOT_CA + "-%s@";
+  public static final String SCM_ROOT_CA_PREFIX = SCM_ROOT_CA + "@";
 
   // Layout Version written into Meta Table ONLY during finalization.
   public static final String LAYOUT_VERSION_KEY = "#LAYOUTVERSION";
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
index 51ca989323..2d07630208 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateApprover.java
@@ -62,6 +62,7 @@ public interface CertificateApprover {
    * @param certificationRequest - Certification Request.
    * @param scmId - SCM id.
    * @param clusterId - Cluster id.
+   * @param certSerialId - the new certificate id.
    * @return Signed Certificate.
    * @throws IOException - On Error
    * @throws OperatorCreationException - on Error.
@@ -75,7 +76,8 @@ public interface CertificateApprover {
       Date validTill,
       PKCS10CertificationRequest certificationRequest,
       String scmId,
-      String clusterId)
+      String clusterId,
+      String certSerialId)
       throws IOException, OperatorCreationException;
 
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
index 819be49722..d74ee1ff0a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/CertificateServer.java
@@ -23,7 +23,6 @@ import 
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
-import 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType;
 import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
 import org.bouncycastle.asn1.x509.CRLReason;
 import org.bouncycastle.cert.X509CertificateHolder;
@@ -97,29 +96,15 @@ public interface CertificateServer {
    * @param csr  - Certificate Signing Request.
    * @param type - An Enum which says what kind of approval process to follow.
    * @param role : OM/SCM/DN
+   * @param certSerialId - New certificate ID
    * @return A future that will have this certificate when this request is
    * approved.
    * @throws SCMSecurityException - on Error.
    */
   Future<CertPath> requestCertificate(
       PKCS10CertificationRequest csr,
-      CertificateApprover.ApprovalType type, NodeType role)
-      throws SCMSecurityException;
-
-
-  /**
-   * Request a Certificate based on Certificate Signing Request.
-   *
-   * @param csr       - Certificate Signing Request as a PEM encoded String.
-   * @param type      - An Enum which says what kind of approval process to
-   *                  follow.
-   * @param nodeType: OM/SCM/DN
-   * @return A future that will have this certificate when this request is
-   * approved.
-   * @throws SCMSecurityException - on Error.
-   */
-  Future<CertPath> requestCertificate(String csr,
-      ApprovalType type, NodeType nodeType) throws IOException;
+      CertificateApprover.ApprovalType type, NodeType role,
+      String certSerialId) throws SCMSecurityException;
 
   /**
    * Revokes a Certificate issued by this CertificateServer.
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
index bd394fe095..90969823e7 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultApprover.java
@@ -22,7 +22,6 @@ package 
org.apache.hadoop.hdds.security.x509.certificate.authority;
 import org.apache.hadoop.hdds.security.SecurityConfig;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import 
org.apache.hadoop.hdds.security.x509.certificate.authority.profile.PKIProfile;
-import org.apache.hadoop.util.Time;
 import org.bouncycastle.asn1.ASN1ObjectIdentifier;
 import org.bouncycastle.asn1.x500.X500Name;
 import org.bouncycastle.asn1.x500.style.BCStyle;
@@ -53,7 +52,7 @@ import java.util.Date;
 import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
-import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getDistinguishedName;
+import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getDistinguishedNameWithSN;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getPkcs9Extensions;
 
 /**
@@ -84,6 +83,7 @@ public class DefaultApprover extends BaseApprover {
    * @param certificationRequest - Certification Request.
    * @param scmId - SCM id.
    * @param clusterId - Cluster id.
+   * @param certSerialId - the new certificate id.
    * @return Signed Certificate.
    * @throws IOException - On Error
    * @throws OperatorCreationException - on Error.
@@ -98,7 +98,8 @@ public class DefaultApprover extends BaseApprover {
       Date validTill,
       PKCS10CertificationRequest certificationRequest,
       String scmId,
-      String clusterId) throws IOException,
+      String clusterId,
+      String certSerialId) throws IOException,
       OperatorCreationException {
 
     AlgorithmIdentifier sigAlgId = new
@@ -118,6 +119,8 @@ public class DefaultApprover extends BaseApprover {
         toASN1Primitive().toString();
     String csrClusterId = x500Name.getRDNs(BCStyle.O)[0].getFirst().getValue().
         toASN1Primitive().toString();
+    String cn = x500Name.getRDNs(BCStyle.CN)[0].getFirst().getValue()
+        .toASN1Primitive().toString();
 
     if (!clusterId.equals(csrClusterId)) {
       if (csrScmId.equalsIgnoreCase("null") &&
@@ -125,15 +128,16 @@ public class DefaultApprover extends BaseApprover {
         // Special case to handle DN certificate generation as DN might not 
know
         // scmId and clusterId before registration. In secure mode registration
         // will succeed only after datanode has a valid certificate.
-        String cn = x500Name.getRDNs(BCStyle.CN)[0].getFirst().getValue()
-            .toASN1Primitive().toString();
-        x500Name = getDistinguishedName(cn, scmId, clusterId);
+        csrClusterId = clusterId;
+        csrScmId = scmId;
       } else {
         // Throw exception if scmId and clusterId doesn't match.
         throw new SCMSecurityException("ScmId and ClusterId in CSR subject" +
             " are incorrect.");
       }
     }
+    x500Name = getDistinguishedNameWithSN(cn, csrScmId, csrClusterId,
+        certSerialId);
 
     RSAKeyParameters rsa =
         (RSAKeyParameters) PublicKeyFactory.createKey(keyInfo);
@@ -144,8 +148,7 @@ public class DefaultApprover extends BaseApprover {
     X509v3CertificateBuilder certificateGenerator =
         new X509v3CertificateBuilder(
             caCertificate.getSubject(),
-            // Serial is not sequential but it is monotonically increasing.
-            BigInteger.valueOf(generateSerialId()),
+            new BigInteger(certSerialId),
             validFrom,
             validTill,
             x500Name, keyInfo);
@@ -173,12 +176,6 @@ public class DefaultApprover extends BaseApprover {
 
   }
 
-  public long generateSerialId() {
-    // TODO: to make generation of serialId distributed.
-    // This issue will be fixed in HDDS-4999.
-    return Time.monotonicNowNanos();
-  }
-
   @Override
   public CompletableFuture<X509CertificateHolder> inspectCSR(String csr)
       throws IOException {
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 0187405f93..6200f2d8da 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -21,6 +21,7 @@ package 
org.apache.hadoop.hdds.security.x509.certificate.authority;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
@@ -64,7 +65,6 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 
-import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getCertificationRequest;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.UNABLE_TO_ISSUE_CERTIFICATE;
 
 /**
@@ -228,7 +228,8 @@ public class DefaultCAServer implements CertificateServer {
   @Override
   public Future<CertPath> requestCertificate(
       PKCS10CertificationRequest csr,
-      CertificateApprover.ApprovalType approverType, NodeType role) {
+      CertificateApprover.ApprovalType approverType, NodeType role,
+      String certSerialId) {
     LocalDateTime beginDate = LocalDateTime.now();
     LocalDateTime endDate;
     // When issuing certificates for sub-ca use the max certificate duration
@@ -262,12 +263,14 @@ public class DefaultCAServer implements CertificateServer 
{
       case TESTING_AUTOMATIC:
         X509CertificateHolder xcert;
         try {
-          xcert = signAndStoreCertificate(beginDate, endDate, csr, role);
+          xcert = signAndStoreCertificate(
+              beginDate, endDate, csr, role, certSerialId);
         } catch (SCMSecurityException e) {
           // Certificate with conflicting serial id, retry again may resolve
           // this issue.
           LOG.error("Certificate storage failed, retrying one more time.", e);
-          xcert = signAndStoreCertificate(beginDate, endDate, csr, role);
+          xcert = signAndStoreCertificate(
+              beginDate, endDate, csr, role, certSerialId);
         }
         CertificateCodec codec = new CertificateCodec(config, componentName);
         CertPath certPath = codec.getCertPath();
@@ -286,19 +289,20 @@ public class DefaultCAServer implements CertificateServer 
{
   }
 
   private X509CertificateHolder signAndStoreCertificate(LocalDateTime 
beginDate,
-      LocalDateTime endDate, PKCS10CertificationRequest csr, NodeType role)
-      throws IOException,
-      OperatorCreationException, CertificateException {
+      LocalDateTime endDate, PKCS10CertificationRequest csr, NodeType role,
+      String certSerialId) throws IOException, OperatorCreationException,
+      CertificateException {
 
     lock.lock();
     X509CertificateHolder xcert;
     try {
+      Preconditions.checkState(!Strings.isNullOrEmpty(certSerialId));
       xcert = approver.sign(config,
           getCAKeys().getPrivate(),
           getCACertificate(),
           Date.from(beginDate.atZone(ZoneId.systemDefault()).toInstant()),
           Date.from(endDate.atZone(ZoneId.systemDefault()).toInstant()),
-          csr, scmID, clusterID);
+          csr, scmID, clusterID, certSerialId);
       if (store != null) {
         store.checkValidCertID(xcert.getSerialNumber());
         store.storeValidCertificate(xcert.getSerialNumber(),
@@ -310,15 +314,6 @@ public class DefaultCAServer implements CertificateServer {
     return xcert;
   }
 
-  @Override
-  public Future<CertPath> requestCertificate(String csr,
-      CertificateApprover.ApprovalType type, NodeType nodeType)
-      throws IOException {
-    PKCS10CertificationRequest request =
-        getCertificationRequest(csr);
-    return requestCertificate(request, type, nodeType);
-  }
-
   @Override
   public Future<Optional<Long>> revokeCertificates(
       List<BigInteger> certificates,
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 55f2e4e386..5e074336bc 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -1314,8 +1314,7 @@ public abstract class DefaultCertificateClient implements 
CertificateClient {
         securityConfig.getCertificateLocation(getComponentName())));
   }
 
-  public SCMSecurityProtocolClientSideTranslatorPB getScmSecureClient()
-      throws IOException {
+  public SCMSecurityProtocolClientSideTranslatorPB getScmSecureClient() {
     return scmSecurityClient;
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
index 00e3dd1602..b01efd31bb 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/SCMCertificateClient.java
@@ -107,14 +107,12 @@ public class SCMCertificateClient extends 
DefaultCertificateClient {
    *
    * @return CertificateSignRequest.Builder
    */
-  @Override
   public CertificateSignRequest.Builder getCSRBuilder()
       throws CertificateException {
-    String subject = String.format(SCM_SUB_CA_PREFIX, System.nanoTime())
-        + scmHostname;
+    String subject = SCM_SUB_CA_PREFIX + scmHostname;
 
     LOG.info("Creating csr for SCM->hostName:{},scmId:{},clusterId:{}," +
-            "subject:{}", scmHostname, scmId, cId, subject);
+        "subject:{}", scmHostname, scmId, cId, subject);
 
     return super.getCSRBuilder()
         .setSubject(subject)
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
index 96221d7a10..c9e520cdee 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/MockApprover.java
@@ -50,7 +50,7 @@ public class MockApprover extends BaseApprover {
       X509CertificateHolder caCertificate,
       Date validFrom, Date validTill,
       PKCS10CertificationRequest request,
-      String scmId, String clusterId)
+      String scmId, String clusterId, String certId)
       throws IOException, OperatorCreationException {
     return null;
   }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
index fc7fd30bcd..df95737fd5 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/authority/TestDefaultCAServer.java
@@ -185,9 +185,6 @@ public class TestDefaultCAServer {
         .setKey(keyPair)
         .build();
 
-    // Let us convert this to a string to mimic the common use case.
-    String csrString = CertificateSignRequest.getEncodedString(csr);
-
     CertificateServer testCA = new DefaultCAServer("testCA",
         clusterId, scmId, caStore,
         new DefaultProfile(),
@@ -195,7 +192,8 @@ public class TestDefaultCAServer {
     testCA.init(securityConfig, CAType.ROOT);
 
     Future<CertPath> holder = testCA.requestCertificate(
-        csrString, CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM);
+        csr, CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM,
+        String.valueOf(System.nanoTime()));
     // Right now our calls are synchronous. Eventually this will have to wait.
     assertTrue(holder.isDone());
     //Test that the cert path returned contains the CA certificate in proper
@@ -238,9 +236,6 @@ public class TestDefaultCAServer {
         .setKey(keyPair)
         .build();
 
-    // Let us convert this to a string to mimic the common use case.
-    String csrString = CertificateSignRequest.getEncodedString(csr);
-
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
         RandomStringUtils.randomAlphabetic(4), caStore,
@@ -249,7 +244,8 @@ public class TestDefaultCAServer {
     testCA.init(securityConfig, CAType.ROOT);
 
     Future<CertPath> holder = testCA.requestCertificate(
-        csrString, CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM);
+        csr, CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM,
+        String.valueOf(System.nanoTime()));
     // Right now our calls are synchronous. Eventually this will have to wait.
     assertTrue(holder.isDone());
     assertNotNull(CertificateCodec.firstCertificateFrom(holder.get()));
@@ -278,11 +274,9 @@ public class TestDefaultCAServer {
         .setKey(keyPair)
         .build();
 
-    // Let us convert this to a string to mimic the common use case.
-    String csrString = CertificateSignRequest.getEncodedString(csr);
-
     Future<CertPath> holder = testCA.requestCertificate(
-        csrString, CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM);
+        csr, CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM,
+        String.valueOf(System.nanoTime()));
 
     X509Certificate certificate =
         CertificateCodec.firstCertificateFrom(holder.get());
@@ -322,9 +316,6 @@ public class TestDefaultCAServer {
         .setKey(keyPair)
         .build();
 
-    // Let us convert this to a string to mimic the common use case.
-    String csrString = CertificateSignRequest.getEncodedString(csr);
-
     CertificateServer testCA = new DefaultCAServer("testCA",
         RandomStringUtils.randomAlphabetic(4),
         RandomStringUtils.randomAlphabetic(4), caStore,
@@ -335,8 +326,9 @@ public class TestDefaultCAServer {
     ExecutionException execution = assertThrows(ExecutionException.class,
         () -> {
           Future<CertPath> holder =
-              testCA.requestCertificate(csrString,
-                  CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM);
+              testCA.requestCertificate(csr,
+                  CertificateApprover.ApprovalType.TESTING_AUTOMATIC, OM,
+                  String.valueOf(System.nanoTime()));
           holder.get();
         });
     assertTrue(execution.getCause().getMessage()
@@ -440,7 +432,7 @@ public class TestDefaultCAServer {
       X509CertificateHolder signedCert = approver.sign(securityConfig,
           keyPair.getPrivate(), externalCert,
           java.sql.Date.valueOf(beginDate), java.sql.Date.valueOf(endDate), 
csr,
-          scmId, clusterId);
+          scmId, clusterId, String.valueOf(System.nanoTime()));
       CertificateFactory certFactory = new CertificateFactory();
       CertificateCodec certificateCodec = new CertificateCodec(securityConfig,
           scmCertificateClient.getComponentName());
@@ -498,7 +490,8 @@ public class TestDefaultCAServer {
           .build();
 
       Future<CertPath> holder = rootCA.requestCertificate(csr,
-          CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM);
+          CertificateApprover.ApprovalType.TESTING_AUTOMATIC, SCM,
+          String.valueOf(System.nanoTime()));
       assertTrue(holder.isDone());
       X509Certificate certificate =
           CertificateCodec.firstCertificateFrom(holder.get());
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
index 32fa5ef40e..286dd154c7 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClientTestImpl.java
@@ -146,7 +146,8 @@ public class CertificateClientTestImpl implements 
CertificateClient {
             Date.from(start.atZone(ZoneId.systemDefault()).toInstant()),
             Date.from(start.plus(Duration.parse(certDuration))
                 .atZone(ZoneId.systemDefault()).toInstant()),
-            csrBuilder.build(), "scm1", "cluster1");
+            csrBuilder.build(), "scm1", "cluster1",
+            String.valueOf(System.nanoTime()));
     x509Certificate =
         new JcaX509CertificateConverter().getCertificate(certificateHolder);
     certificateMap.put(x509Certificate.getSerialNumber().toString(),
@@ -337,7 +338,8 @@ public class CertificateClientTestImpl implements 
CertificateClient {
         approver.sign(securityConfig, rootKeyPair.getPrivate(),
             new X509CertificateHolder(rootCert.getEncoded()), start,
             new Date(start.getTime() + certDuration.toMillis()),
-            csrBuilder.build(), "scm1", "cluster1");
+            csrBuilder.build(), "scm1", "cluster1",
+            String.valueOf(System.nanoTime()));
     X509Certificate newX509Certificate =
         new JcaX509CertificateConverter().getCertificate(certificateHolder);
 
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCertificate.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCertificate.java
index b725bd9ca2..506f3da1db 100644
--- 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCertificate.java
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/utils/TestRootCertificate.java
@@ -102,7 +102,7 @@ public class TestRootCertificate {
 
     // Check the Subject Name and Issuer Name is in the expected format.
     String dnName = String.format(SelfSignedCertificate.getNameFormat(),
-        subject, scmID, clusterID);
+        subject, scmID, clusterID, certificateHolder.getSerialNumber());
     Assertions.assertEquals(dnName, certificateHolder.getIssuer().toString());
     Assertions.assertEquals(dnName, certificateHolder.getSubject().toString());
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
index 139a4ca99b..41b314aba5 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/HASecurityUtils.java
@@ -100,7 +100,7 @@ public final class HASecurityUtils {
     SecurityConfig securityConfig = new SecurityConfig(conf);
     SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
         getScmSecurityClientWithMaxRetry(conf, getCurrentUser());
-    try (CertificateClient certClient =
+    try (SCMCertificateClient certClient =
         new SCMCertificateClient(securityConfig, scmSecurityClient,
             scmStorageConfig.getScmId(), scmStorageConfig.getClusterID(),
             scmStorageConfig.getScmCertSerialId(), scmHostname)) {
@@ -137,10 +137,14 @@ public final class HASecurityUtils {
    * client.
    */
   private static void getRootCASignedSCMCert(
-      OzoneConfiguration configuration, CertificateClient client,
+      OzoneConfiguration configuration, SCMCertificateClient client,
       SecurityConfig securityConfig,
       SCMStorageConfig scmStorageConfig, String scmHostname) {
     try {
+      // Create SCM security client.
+      SCMSecurityProtocolClientSideTranslatorPB secureScmClient =
+          getScmSecurityClientWithFixedDuration(configuration);
+
       // Generate CSR.
       PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
           securityConfig, scmHostname);
@@ -151,10 +155,6 @@ public final class HASecurityUtils {
               .setHostName(scmHostname)
               .setScmNodeId(scmStorageConfig.getScmId()).build();
 
-      // Create SCM security client.
-      SCMSecurityProtocolClientSideTranslatorPB secureScmClient =
-          getScmSecurityClientWithFixedDuration(configuration);
-
       // Get SCM sub CA cert.
       SCMGetCertResponseProto response = secureScmClient.
           getSCMCertChain(scmNodeDetailsProto, getEncodedString(csr), false);
@@ -189,21 +189,22 @@ public final class HASecurityUtils {
    * For primary SCM get sub-ca signed certificate and root CA certificate by
    * root CA certificate server and store it using certificate client.
    */
-  private static void getPrimarySCMSelfSignedCert(CertificateClient client,
+  private static void getPrimarySCMSelfSignedCert(SCMCertificateClient client,
       SecurityConfig config, SCMStorageConfig scmStorageConfig,
       String scmHostname) {
-
     try {
-
       CertificateServer rootCAServer =
           initializeRootCertificateServer(config, null, scmStorageConfig,
               new DefaultCAProfile());
 
+      // First SCM sub CA certificate ID 2
+      String certId = BigInteger.ONE.add(BigInteger.ONE).toString();
+
       PKCS10CertificationRequest csr = generateCSR(client, scmStorageConfig,
           config, scmHostname);
 
       CertPath subSCMCertHolderList = rootCAServer.
-          requestCertificate(csr, KERBEROS_TRUSTED, SCM).get();
+          requestCertificate(csr, KERBEROS_TRUSTED, SCM, certId).get();
 
       CertPath rootCACertificatePath =
           rootCAServer.getCaCertPath();
@@ -251,7 +252,7 @@ public final class HASecurityUtils {
       SecurityConfig config, CertificateStore scmCertStore,
       SCMStorageConfig scmStorageConfig, BigInteger rootCertId,
       PKIProfile pkiProfile, String component) throws IOException {
-    String subject = String.format(SCM_ROOT_CA_PREFIX, rootCertId) +
+    String subject = SCM_ROOT_CA_PREFIX +
         InetAddress.getLocalHost().getHostName();
 
     DefaultCAServer rootCAServer = new DefaultCAServer(subject,
@@ -287,14 +288,13 @@ public final class HASecurityUtils {
    * Generate CSR to obtain SCM sub CA certificate.
    */
   private static PKCS10CertificationRequest generateCSR(
-      CertificateClient client, SCMStorageConfig scmStorageConfig,
+      SCMCertificateClient client, SCMStorageConfig scmStorageConfig,
       SecurityConfig config, String scmHostname)
       throws IOException {
     CertificateSignRequest.Builder builder = client.getCSRBuilder();
 
     // Get host name.
-    String subject = String.format(SCM_SUB_CA_PREFIX, System.nanoTime())
-        + scmHostname;
+    String subject = SCM_SUB_CA_PREFIX + scmHostname;
 
     builder.setConfiguration(config)
         .setScmID(scmStorageConfig.getScmId())
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SequenceIdGenerator.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SequenceIdGenerator.java
index 4e72376f85..93271d1474 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SequenceIdGenerator.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SequenceIdGenerator.java
@@ -69,6 +69,11 @@ public class SequenceIdGenerator {
   public static final String LOCAL_ID = "localId";
   public static final String DEL_TXN_ID = "delTxnId";
   public static final String CONTAINER_ID = "containerId";
+
+  // Certificate ID for all services, including root certificates, whose ID
+  // were using "rootCertificateId" before.
+  public static final String CERTIFICATE_ID = "CertificateId";
+  @Deprecated
   public static final String ROOT_CERTIFICATE_ID = "rootCertificateId";
 
   private static final long INVALID_SEQUENCE_ID = 0;
@@ -132,7 +137,7 @@ public class SequenceIdGenerator {
 
         Preconditions.checkArgument(Long.MAX_VALUE - batch.lastId >= 
batchSize);
         long nextLastId = batch.lastId +
-            (sequenceIdName.equals(ROOT_CERTIFICATE_ID) ? 1 : batchSize);
+            ((sequenceIdName.equals(CERTIFICATE_ID)) ? 1 : batchSize);
 
         if (stateManager.allocateBatch(sequenceIdName,
             prevLastId, nextLastId)) {
@@ -393,20 +398,35 @@ public class SequenceIdGenerator {
           CONTAINER_ID, sequenceIdTable.get(CONTAINER_ID));
     }
 
-    // upgrade root certificate ID
-    if (sequenceIdTable.get(ROOT_CERTIFICATE_ID) == null) {
-      long largestRootCertId = BigInteger.ONE.longValueExact();
+    upgradeToCertificateSequenceId(scmMetadataStore, false);
+  }
+
+  public static void upgradeToCertificateSequenceId(
+      SCMMetadataStore scmMetadataStore, boolean force) throws IOException {
+    Table<String, Long> sequenceIdTable = 
scmMetadataStore.getSequenceIdTable();
+
+    // upgrade certificate ID table
+    if (sequenceIdTable.get(CERTIFICATE_ID) == null || force) {
+      // Start from ID 2.
+      // ID 1 - root certificate, ID 2 - first SCM certificate.
+      long largestCertId = BigInteger.ONE.add(BigInteger.ONE).longValueExact();
       try (TableIterator<BigInteger,
           ? extends KeyValue<BigInteger, X509Certificate>> iterator =
                scmMetadataStore.getValidSCMCertsTable().iterator()) {
         while (iterator.hasNext()) {
           X509Certificate cert = iterator.next().getValue();
-          if (HASecurityUtils.isSelfSignedCertificate(cert) &&
-              HASecurityUtils.isCACertificate(cert)) {
-            largestRootCertId =
-                Long.max(cert.getSerialNumber().longValueExact(),
-                    largestRootCertId);
-          }
+          largestCertId = Long.max(cert.getSerialNumber().longValueExact(),
+              largestCertId);
+        }
+      }
+
+      try (TableIterator<BigInteger,
+          ? extends KeyValue<BigInteger, X509Certificate>> iterator =
+               scmMetadataStore.getValidCertsTable().iterator()) {
+        while (iterator.hasNext()) {
+          X509Certificate cert = iterator.next().getValue();
+          largestCertId = Long.max(
+              cert.getSerialNumber().longValueExact(), largestCertId);
         }
       }
 
@@ -416,17 +436,19 @@ public class SequenceIdGenerator {
         while (iterator.hasNext()) {
           X509Certificate cert =
               iterator.next().getValue().getX509Certificate();
-          if (HASecurityUtils.isSelfSignedCertificate(cert) &&
-              HASecurityUtils.isCACertificate(cert)) {
-            largestRootCertId =
-                Long.max(cert.getSerialNumber().longValueExact(),
-                    largestRootCertId);
-          }
+          largestCertId = Long.max(
+              cert.getSerialNumber().longValueExact(), largestCertId);
         }
       }
-      sequenceIdTable.put(ROOT_CERTIFICATE_ID, largestRootCertId);
-      LOG.info("upgrade {} to {}",
-          ROOT_CERTIFICATE_ID, sequenceIdTable.get(ROOT_CERTIFICATE_ID));
+      sequenceIdTable.put(CERTIFICATE_ID, largestCertId);
+      LOG.info("upgrade {} to {}", CERTIFICATE_ID,
+          sequenceIdTable.get(CERTIFICATE_ID));
+    }
+
+    // delete the ROOT_CERTIFICATE_ID record if exists
+    // ROOT_CERTIFICATE_ID is replaced with CERTIFICATE_ID now
+    if (sequenceIdTable.get(ROOT_CERTIFICATE_ID) != null) {
+      sequenceIdTable.delete(ROOT_CERTIFICATE_ID);
     }
   }
 }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationHandlerImpl.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationHandlerImpl.java
index aed03da70b..cdaf2d34c2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationHandlerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationHandlerImpl.java
@@ -115,31 +115,6 @@ public class RootCARotationHandlerImpl implements 
RootCARotationHandler {
       return;
     }
 
-    // Wait for the rotation preparation of this SCM to finish. The rotation
-    // preparation is running parallel in rotationManager's executor thread.
-    // If rotation preparation is not finished yet, then the later move
-    // new -> current operation will fail as the new directory may not exist
-    // yet.
-    long st = System.nanoTime();
-    long waitForNanos =
-        rotationManager.getSecurityConfig().getCaAckTimeout().toNanos();
-    String certId = newSubCACertId.get();
-    while (certId == null && (System.nanoTime() - st < waitForNanos)) {
-      try {
-        Thread.sleep(100);
-      } catch (InterruptedException e) {
-        Thread.currentThread().interrupt();
-        throw new IOException("Thread is interrupted");
-      }
-      certId = newSubCACertId.get();
-    }
-    if (certId == null) {
-      String message = "Failed to finish the rotation preparation in " +
-          rotationManager.getSecurityConfig().getCaAckTimeout();
-      LOG.error(message);
-      scm.shutDown(message);
-    }
-
     // switch sub CA key and certs directory on disk
     File currentSubCaDir = new File(secConfig.getLocation(
         scmCertClient.getComponentName()).toString());
@@ -173,6 +148,7 @@ public class RootCARotationHandlerImpl implements 
RootCARotationHandler {
     }
 
     try {
+      String certId = newSubCACertId.get();
       LOG.info("Persistent new scm certificate {}", certId);
       scm.getScmStorageConfig().setScmCertSerialId(certId);
       scm.getScmStorageConfig().persistCurrentState();
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationManager.java
index bb6ca796b1..08f8fc928d 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/security/RootCARotationManager.java
@@ -71,7 +71,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NEW_KEY_CERT_DIR_NAME_PROGRESS_SUFFIX;
 import static 
org.apache.hadoop.hdds.HddsConfigKeys.HDDS_NEW_KEY_CERT_DIR_NAME_SUFFIX;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_X509_DIR_NAME_DEFAULT;
-import static 
org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.ROOT_CERTIFICATE_ID;
+import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.CERTIFICATE_ID;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore.CertType.VALID_CERTS;
 import static org.apache.hadoop.ozone.OzoneConsts.SCM_ROOT_CA_COMPONENT_NAME;
 
@@ -227,10 +227,6 @@ public class RootCARotationManager extends StatefulService 
{
     return RootCARotationManager.class.getSimpleName();
   }
 
-  public SecurityConfig getSecurityConfig() {
-    return secConf;
-  }
-
   /**
    * Schedule monitor task.
    */
@@ -390,7 +386,7 @@ public class RootCARotationManager extends StatefulService {
           BigInteger newId = BigInteger.ONE;
           try {
             newId = new BigInteger(String.valueOf(
-                sequenceIdGen.getNextId(ROOT_CERTIFICATE_ID)));
+                sequenceIdGen.getNextId(CERTIFICATE_ID)));
             newRootCAServer =
                 HASecurityUtils.initializeRootCertificateServer(secConf,
                     scm.getCertificateStore(), scmStorageConfig, newId,
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index bec5a8dcff..bad326cad1 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -52,6 +52,7 @@ import 
org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolOmPB;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.protocolPB.SecretKeyProtocolScmPB;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import 
org.apache.hadoop.hdds.scm.protocol.SecretKeyProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.update.server.SCMUpdateServiceGrpcServer;
 import org.apache.hadoop.hdds.scm.update.client.UpdateServiceConfig;
@@ -78,12 +79,14 @@ import org.apache.hadoop.security.KerberosInfo;
 
 import org.apache.hadoop.security.UserGroupInformation;
 import org.bouncycastle.asn1.x509.CRLReason;
+import org.bouncycastle.pkcs.PKCS10CertificationRequest;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nullable;
 
 import static 
org.apache.hadoop.hdds.scm.ScmUtils.checkIfCertSignRequestAllowed;
+import static org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator.CERTIFICATE_ID;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_ENABLED;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecretKeyException.ErrorCode.SECRET_KEY_NOT_INITIALIZED;
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
@@ -91,6 +94,7 @@ import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.Err
 import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getPEMEncodedString;
+import static 
org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateSignRequest.getCertificationRequest;
 
 /**
  * The protocol used to perform security related operations with SCM.
@@ -113,6 +117,7 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
   private final StorageContainerManager storageContainerManager;
   private final CertificateClient scmCertificateClient;
   private final OzoneConfiguration config;
+  private final SequenceIdGenerator sequenceIdGen;
 
   // SecretKey may not be enabled when neither block token nor container
   // token is enabled.
@@ -130,6 +135,7 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
     this.scmCertificateServer = scmCertificateServer;
     this.scmCertificateClient = scmCertClient;
     this.config = conf;
+    this.sequenceIdGen = scm.getSequenceIdGen();
     this.secretKeyManager = secretKeyManager;
     final int handlerCount =
         conf.getInt(ScmConfigKeys.OZONE_SCM_SECURITY_HANDLER_COUNT_KEY,
@@ -188,14 +194,15 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
    */
   @Override
   public String getDataNodeCertificate(
-      DatanodeDetailsProto dnDetails,
-      String certSignReq) throws IOException {
+      DatanodeDetailsProto dnDetails, String certSignReq) throws IOException {
     LOGGER.info("Processing CSR for dn {}, UUID: {}", dnDetails.getHostName(),
         dnDetails.getUuid());
     Objects.requireNonNull(dnDetails);
+
     checkIfCertSignRequestAllowed(
         storageContainerManager.getRootCARotationManager(), false, config,
         "getDataNodeCertificate");
+
     return getEncodedCertToString(certSignReq, NodeType.DATANODE);
   }
 
@@ -207,9 +214,11 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
         nodeDetails.getNodeType(), nodeDetails.getHostName(),
         nodeDetails.getUuid());
     Objects.requireNonNull(nodeDetails);
+
     checkIfCertSignRequestAllowed(
         storageContainerManager.getRootCARotationManager(), false, config,
         "getCertificate");
+
     return getEncodedCertToString(certSignReq, nodeDetails.getNodeType());
   }
 
@@ -284,9 +293,11 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
     LOGGER.info("Processing CSR for om {}, UUID: {}", omDetails.getHostName(),
         omDetails.getUuid());
     Objects.requireNonNull(omDetails);
+
     checkIfCertSignRequestAllowed(
         storageContainerManager.getRootCARotationManager(), false, config,
         "getOMCertificate");
+
     return getEncodedCertToString(certSignReq, NodeType.OM);
   }
 
@@ -343,12 +354,13 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
   private synchronized String getEncodedCertToString(String certSignReq,
       NodeType nodeType) throws IOException {
     Future<CertPath> future;
+    PKCS10CertificationRequest csr = getCertificationRequest(certSignReq);
     if (nodeType == NodeType.SCM && rootCertificateServer != null) {
-      future = rootCertificateServer.requestCertificate(certSignReq,
-          KERBEROS_TRUSTED, nodeType);
+      future = rootCertificateServer.requestCertificate(csr,
+          KERBEROS_TRUSTED, nodeType, getNextCertificateId());
     } else {
-      future = scmCertificateServer.requestCertificate(certSignReq,
-          KERBEROS_TRUSTED, nodeType);
+      future = scmCertificateServer.requestCertificate(csr,
+          KERBEROS_TRUSTED, nodeType, getNextCertificateId());
     }
     try {
       return getPEMEncodedString(future.get());
@@ -519,6 +531,10 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol,
     return pemEncodedCerts;
   }
 
+  private String getNextCertificateId() throws IOException {
+    return String.valueOf(sequenceIdGen.getNextId(CERTIFICATE_ID));
+  }
+
   public SCMUpdateServiceGrpcServer getGrpcUpdateServer() {
     return grpcUpdateServer;
   }
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 8ecd5ed480..70636215b2 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -861,7 +861,7 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
     final CertificateServer rootCertificateServer;
 
     // Start specific instance SCM CA server.
-    String subject = String.format(SCM_SUB_CA_PREFIX, System.nanoTime()) +
+    String subject = SCM_SUB_CA_PREFIX +
         InetAddress.getLocalHost().getHostName();
     if (configurator.getCertificateServer() != null) {
       scmCertificateServer = configurator.getCertificateServer();
@@ -946,6 +946,8 @@ public final class StorageContainerManager extends 
ServiceRuntimeInfoImpl
       certificateStore.storeValidScmCertificate(
           rootCACert.getSerialNumber(), rootCACert);
     }
+    // Upgrade certificate sequence ID
+    SequenceIdGenerator.upgradeToCertificateSequenceId(scmMetadataStore, true);
   }
 
   public CertificateServer getRootCertificateServer() {
diff --git 
a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/disabled-test-root-ca-rotation.sh
 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/disabled-test-root-ca-rotation.sh
index c8138d8923..e83f343796 100755
--- 
a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/disabled-test-root-ca-rotation.sh
+++ 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/disabled-test-root-ca-rotation.sh
@@ -40,7 +40,7 @@ execute_robot_test scm1.org kinit.robot
 wait_for_execute_command scm1.org 30 "jps | grep 
StorageContainerManagerStarter | sed 's/StorageContainerManagerStarter//' | 
xargs  | xargs -I {} jstack {} | grep 'RootCARotationManager-Active'"
 
 # wait and verify root CA is rotated
-wait_for_execute_command scm1.org 240 "ozone admin cert info 2"
+wait_for_root_certificate scm1.org 240 2
 
 # transfer leader to scm2.org
 execute_robot_test scm1.org scmha/scm-leader-transfer.robot
@@ -54,21 +54,25 @@ execute_commands_in_container scm1.org "ozone sh volume 
create /r-v1 && ozone sh
 execute_robot_test scm1.org admincli/pipeline.robot
 
 # wait for next root CA rotation
-wait_for_execute_command scm1.org 240 "ozone admin cert info 3"
+wait_for_root_certificate scm1.org 240 3
 
 # bootstrap new SCM4 and verify certificate
 docker-compose up -d scm4.org
 wait_for_port scm4.org 9894 120
 execute_robot_test scm4.org kinit.robot
 wait_for_execute_command scm4.org 120 "ozone admin scm roles | grep scm4.org"
-wait_for_execute_command scm4.org 30 "ozone admin cert list --role=scm | grep 
scm4.org"
+wait_for_execute_command scm4.org 30 "ozone admin cert list --role=scm -c 100| 
grep scm4.org"
 
 # wait for next root CA rotation
-wait_for_execute_command scm4.org 240 "ozone admin cert info 4"
-
-wait_for_execute_command om1 30 "find /data/metadata/om/certs/ROOTCA-4.crt"
-wait_for_execute_command om2 30 "find /data/metadata/om/certs/ROOTCA-4.crt"
-wait_for_execute_command om3 30 "find /data/metadata/om/certs/ROOTCA-4.crt"
+wait_for_root_certificate scm4.org 240 4
+
+execute_robot_test om1 kinit.robot
+execute_robot_test om2 kinit.robot
+execute_robot_test om3 kinit.robot
+check_root_ca_file_cmd="ozone admin cert list --role=scm -c 100 | grep -v 
'scm-sub' | grep 'scm'  | cut -d ' ' -f 1 | sort | tail -n 1 | xargs -I {} echo 
/data/metadata/om/certs/ROOTCA-{}.crt | xargs find"
+wait_for_execute_command om1 30 $check_root_ca_file_cmd
+wait_for_execute_command om2 30 $check_root_ca_file_cmd
+wait_for_execute_command om3 30 $check_root_ca_file_cmd
 execute_robot_test scm4.org -v PREFIX:"rootca2" 
certrotation/root-ca-rotation-client-checks.robot
 
 #transfer leader to scm4.org
@@ -84,7 +88,7 @@ execute_robot_test scm3.org kinit.robot
 execute_robot_test scm4.org  -v "TARGET_SCM:scm3.org" 
scmha/scm-leader-transfer.robot
 
 # wait for next root CA rotation
-wait_for_execute_command scm3.org 240 "ozone admin cert info 5"
+wait_for_root_certificate scm3.org 240 5
 
 #decomission scm3.org
 execute_robot_test scm1.org scmha/scm-decommission.robot
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
index d50101bcc0..d577b27c38 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
@@ -25,6 +25,7 @@ services:
     command: ["krb5kdc","-n"]
   kms:
     image: apache/hadoop:${HADOOP_VERSION}
+    hostname: kms
     ports:
     - 9600:9600
     env_file:
@@ -36,6 +37,7 @@ services:
     command: ["hadoop", "kms"]
   datanode:
     image: ${OZONE_RUNNER_IMAGE}:${OZONE_RUNNER_VERSION}
+    hostname: dn
     volumes:
       - ../..:/opt/hadoop
       - ../_keytabs:/etc/security/keytabs
diff --git 
a/hadoop-ozone/dist/src/main/compose/ozonesecure/root-ca-rotation.yaml 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/root-ca-rotation.yaml
index 13a3df5668..d26425231b 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/root-ca-rotation.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/root-ca-rotation.yaml
@@ -36,7 +36,7 @@ x-root-cert-rotation-config:
     - OZONE-SITE.XML_ozone.scm.ha.ratis.request.timeout=2s
     - 
OZONE-SITE.XML_ozone.http.filter.initializers=org.apache.hadoop.security.HttpCrossOriginFilterInitializer
     - OZONE-SITE.XML_hdds.x509.ca.rotation.enabled=true
-    - OZONE-SITE.XML_hdds.x509.expired.certificate.check.interval=PT15s
+    - OZONE-SITE.XML_hdds.x509.expired.certificate.check.interval=PT30s
 services:
   datanode:
     <<: *root-cert-rotation-config
diff --git 
a/hadoop-ozone/dist/src/main/compose/ozonesecure/test-root-ca-rotation.sh 
b/hadoop-ozone/dist/src/main/compose/ozonesecure/test-root-ca-rotation.sh
index 5e97638cb8..818ac303e4 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/test-root-ca-rotation.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/test-root-ca-rotation.sh
@@ -39,21 +39,26 @@ execute_robot_test scm kinit.robot
 wait_for_execute_command scm 30 "jps | grep StorageContainerManagerStarter |  
sed 's/StorageContainerManagerStarter//' | xargs | xargs -I {} jstack {} | grep 
'RootCARotationManager-Active'"
 
 # wait and verify root CA is rotated
-wait_for_execute_command scm 180 "ozone admin cert info 2"
-wait_for_execute_command datanode 30 "find 
/data/metadata/dn/certs/ROOTCA-2.crt"
+wait_for_root_certificate scm 180 2
+execute_robot_test datanode kinit.robot
+check_root_ca_file_cmd="ozone admin cert list --role=scm | grep -v 'scm-sub' | 
grep 'scm'  | cut -d ' ' -f 1 | sort | tail -n 1 | xargs -I {} echo 
/data/metadata/dn/certs/ROOTCA-{}.crt | xargs find"
+wait_for_execute_command datanode 30 $check_root_ca_file_cmd
+
 # We need to wait here for the new certificate in OM as well, because it might
 # get to the OM later, and the client will not trust the DataNode with the new
 # certificate and will not refetch the CA certs as that will be implemented in
 # HDDS-8958.
-wait_for_execute_command om 30 "find /data/metadata/om/certs/ROOTCA-2.crt"
+execute_robot_test om kinit.robot
+check_root_ca_file_cmd="ozone admin cert list --role=scm | grep -v 'scm-sub' | 
grep 'scm'  | cut -d ' ' -f 1 | sort | tail -n 1 | xargs -I {} echo 
/data/metadata/om/certs/ROOTCA-{}.crt | xargs find"
+wait_for_execute_command om 30 $check_root_ca_file_cmd
 execute_robot_test scm -v PREFIX:"rootca" 
certrotation/root-ca-rotation-client-checks.robot
 
 # verify om operations and data operations
 execute_commands_in_container scm "ozone sh volume create /r-v1 && ozone sh 
bucket create /r-v1/r-b1"
 
 # wait for second root CA rotation
-wait_for_execute_command scm 180 "ozone admin cert info 3"
-wait_for_execute_command om 30 "find /data/metadata/om/certs/ROOTCA-3.crt"
+wait_for_root_certificate scm 180 3
+wait_for_execute_command om 30 $check_root_ca_file_cmd
 wait_for_execute_command scm 60 "! ozone admin cert info 1"
 execute_robot_test scm -v PREFIX:"rootca2" 
certrotation/root-ca-rotation-client-checks.robot
 # check the metrics
diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh 
b/hadoop-ozone/dist/src/main/compose/testlib.sh
index 91eb3c8660..0bf2f88b41 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -543,3 +543,26 @@ wait_for_datanode() {
   done
   echo "WARNING: $datanode is still not $state"
 }
+
+
+## @description wait for n root certificates
+wait_for_root_certificate(){
+  local container=$1
+  local timeout=$2
+  local count=$3
+  local command="ozone admin cert list --role=scm -c 100 | grep -v "scm-sub" | 
grep "scm" | wc -l"
+
+  #Reset the timer
+  SECONDS=0
+  while [[ $SECONDS -lt $timeout ]]; do
+    cert_number=`docker-compose exec -T $container /bin/bash -c "$command"`
+    if [[ $cert_number -eq $count ]]; then
+      echo "$count root certificates are found"
+      return
+    fi
+      echo "$count root certificates are not found yet"
+      sleep 1
+  done
+  echo "Timed out waiting on $count root certificates. Current timestamp " 
$(date +"%T")
+  return 1
+}
diff --git a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot 
b/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
index 62ea760efc..8f142027f3 100644
--- a/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/commonlib.robot
@@ -28,7 +28,7 @@ ${OM_SERVICE_ID}     om
 *** Keywords ***
 Get test user principal
     [arguments]         ${user}
-    ${instance} =       Execute                    hostname | sed 
's/scm[0-9].org/scm/'
+    ${instance} =       Execute                    hostname | sed 
's/scm[0-9].org/scm/' | sed 's/om[0-9]/om/'
     [return]            ${user}/${instance}@EXAMPLE.COM
 
 Kinit HTTP user
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index e437a0d9e6..ecd6bcfada 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -1422,7 +1422,8 @@ final class TestSecureOzoneCluster {
             Date.from(start.atZone(ZoneId.systemDefault()).toInstant()),
             Date.from(start.plus(certDuration)
                 .atZone(ZoneId.systemDefault()).toInstant()),
-            csrBuilder.build(), "test", clusterId);
+            csrBuilder.build(), "test", clusterId,
+            String.valueOf(System.nanoTime()));
     return certificateHolder;
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to