This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 20b8bca  HDDS-4876. [SCM HA Security] Add failover proxy to SCM 
Security Server Protocol (#1978)
20b8bca is described below

commit 20b8bca51e4e5b3f2aa04d630411cc7f117443b0
Author: Bharat Viswanadham <[email protected]>
AuthorDate: Tue Mar 9 23:21:28 2021 +0530

    HDDS-4876. [SCM HA Security] Add failover proxy to SCM Security Server 
Protocol (#1978)
---
 .../security/exception/SCMSecurityException.java   |  39 ++-
 .../x509/certificate/utils/CertificateCodec.java   |   3 +-
 .../SCMSecurityProtocolClientSideTranslatorPB.java |  31 +++
 .../SCMSecurityProtocolFailoverProxyProvider.java  | 281 +++++++++++++++++++++
 .../certificate/authority/DefaultCAServer.java     |   4 +-
 .../certificates/utils/CertificateSignRequest.java |   5 +-
 .../java/org/apache/hadoop/hdds/utils/HAUtils.java |  23 ++
 .../apache/hadoop/hdds/utils/HddsServerUtil.java   |  38 +--
 .../src/main/proto/ScmServerSecurityProtocol.proto |  13 +
 .../SCMSecurityProtocolServerSideTranslatorPB.java |  88 ++++---
 .../hdds/scm/server/SCMSecurityProtocolServer.java |  46 +++-
 .../hadoop/ozone/TestSecureOzoneCluster.java       |   5 +-
 12 files changed, 494 insertions(+), 82 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
index bbe25a9..95d6064 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/exception/SCMSecurityException.java
@@ -37,6 +37,16 @@ public class SCMSecurityException extends IOException {
 
   /**
    * Ctor.
+   * @param message - Error Message
+   * @param errorCode - Error code
+   */
+  public SCMSecurityException(String message, ErrorCode errorCode) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+
+  /**
+   * Ctor.
    * @param message - Message.
    * @param cause  - Actual cause.
    */
@@ -47,11 +57,23 @@ public class SCMSecurityException extends IOException {
 
   /**
    * Ctor.
-   * @param message - Message.
+   * @param message - Error Message
+   * @param cause - Actual cause.
+   * @param errorCode - Error code.
+   */
+  public SCMSecurityException(String message, Throwable cause,
+      ErrorCode errorCode) {
+    super(message, cause);
+    this.errorCode = errorCode;
+  }
+
+  /**
+   * Ctor.
+   * @param cause - Actual cause.
    * @param error   - error code.
    */
-  public SCMSecurityException(String message, ErrorCode error) {
-    super(message);
+  public SCMSecurityException(Exception cause, ErrorCode error) {
+    super(cause);
     this.errorCode = error;
   }
 
@@ -72,6 +94,17 @@ public class SCMSecurityException extends IOException {
    * Error codes to make it easy to decode these exceptions.
    */
   public enum ErrorCode {
+    OK,
+    INVALID_CSR,
+    UNABLE_TO_ISSUE_CERTIFICATE,
+    GET_DN_CERTIFICATE_FAILED,
+    GET_OM_CERTIFICATE_FAILED,
+    GET_SCM_CERTIFICATE_FAILED,
+    GET_CERTIFICATE_FAILED,
+    GET_CA_CERT_FAILED,
+    CERTIFICATE_NOT_FOUND,
+    PEM_ENCODE_FAILED,
+    INTERNAL_ERROR,
     DEFAULT,
     MISSING_BLOCK_TOKEN,
     BLOCK_TOKEN_VERIFICATION_FAILED
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
index 1abdcc3..53d8e9a 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/utils/CertificateCodec.java
@@ -50,6 +50,7 @@ import static java.nio.charset.StandardCharsets.UTF_8;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_EXECUTE;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_READ;
 import static java.nio.file.attribute.PosixFilePermission.OWNER_WRITE;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.PEM_ENCODE_FAILED;
 
 /**
  * A class used to read and write X.509 certificates  PEM encoded Streams.
@@ -125,7 +126,7 @@ public class CertificateCodec {
       LOG.error("Error in encoding certificate." + certificate
           .getSubjectDN().toString(), e);
       throw new SCMSecurityException("PEM Encoding failed for certificate." +
-          certificate.getSubjectDN().toString(), e);
+          certificate.getSubjectDN().toString(), e, PEM_ENCODE_FAILED);
     }
   }
 
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
index 672b95e..f54d228 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/protocolPB/SCMSecurityProtocolClientSideTranslatorPB.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.List;
 import java.util.function.Consumer;
 
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
@@ -37,7 +38,10 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityRequest.Builder;
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Type;
+import 
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
@@ -58,12 +62,22 @@ public class SCMSecurityProtocolClientSideTranslatorPB 
implements
    */
   private static final RpcController NULL_RPC_CONTROLLER = null;
   private final SCMSecurityProtocolPB rpcProxy;
+  private SCMSecurityProtocolFailoverProxyProvider failoverProxyProvider;
 
   public SCMSecurityProtocolClientSideTranslatorPB(
       SCMSecurityProtocolPB rpcProxy) {
     this.rpcProxy = rpcProxy;
   }
 
+  public SCMSecurityProtocolClientSideTranslatorPB(
+      SCMSecurityProtocolFailoverProxyProvider proxyProvider) {
+    Preconditions.checkState(proxyProvider != null);
+    this.failoverProxyProvider = proxyProvider;
+    this.rpcProxy = (SCMSecurityProtocolPB) RetryProxy.create(
+        SCMSecurityProtocolPB.class, failoverProxyProvider,
+        failoverProxyProvider.getRetryPolicy());
+  }
+
   /**
    * Helper method to wrap the request and send the message.
    */
@@ -80,6 +94,9 @@ public class SCMSecurityProtocolClientSideTranslatorPB 
implements
       SCMSecurityRequest wrapper = builder.build();
 
       response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+
+      handleError(response);
+
     } catch (ServiceException ex) {
       throw ProtobufHelper.getRemoteException(ex);
     }
@@ -87,6 +104,20 @@ public class SCMSecurityProtocolClientSideTranslatorPB 
implements
   }
 
   /**
+   * If response is not successful, throw exception.
+   * @param resp - SCMSecurityResponse
+   * @return if response is success, return response, else throw exception.
+   * @throws SCMSecurityException
+   */
+  private SCMSecurityResponse handleError(SCMSecurityResponse resp)
+      throws SCMSecurityException {
+    if (resp.getStatus() != SCMSecurityProtocolProtos.Status.OK) {
+      throw new SCMSecurityException(resp.getMessage(),
+          SCMSecurityException.ErrorCode.values()[resp.getStatus().ordinal()]);
+    }
+    return resp;
+  }
+  /**
    * Closes this stream and releases any system resources associated
    * with it. If the stream is already closed then invoking this
    * method has no effect.
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
new file mode 100644
index 0000000..a2d2fb3
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMSecurityProtocolFailoverProxyProvider.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.proxy;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.ha.SCMNodeInfo;
+import org.apache.hadoop.hdds.utils.HAUtils;
+import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource;
+import org.apache.hadoop.io.retry.FailoverProxyProvider;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+
+/**
+ * Failover proxy provider for SCMSecurityProtocol server.
+ */
+public class SCMSecurityProtocolFailoverProxyProvider implements
+    FailoverProxyProvider<SCMSecurityProtocolPB>, Closeable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMSecurityProtocolFailoverProxyProvider.class);
+
+  // scmNodeId -> ProxyInfo<rpcProxy>
+  private final Map<String,
+      ProxyInfo<SCMSecurityProtocolPB>> scmProxies;
+
+  // scmNodeId -> SCMProxyInfo
+  private final Map<String, SCMProxyInfo> scmProxyInfoMap;
+
+  private List<String> scmNodeIds;
+
+  private String currentProxySCMNodeId;
+  private int currentProxyIndex;
+
+  private final ConfigurationSource conf;
+  private final SCMClientConfig scmClientConfig;
+  private final long scmVersion;
+
+  private String scmServiceId;
+
+  private final int maxRetryCount;
+  private final long retryInterval;
+
+  private final UserGroupInformation ugi;
+
+  /**
+   * Construct fail-over proxy provider for SCMSecurityProtocol Server.
+   * @param conf
+   * @param userGroupInformation
+   */
+  public SCMSecurityProtocolFailoverProxyProvider(ConfigurationSource conf,
+      UserGroupInformation userGroupInformation) {
+    Preconditions.checkNotNull(userGroupInformation);
+    this.ugi = userGroupInformation;
+    this.conf = conf;
+    this.scmVersion = RPC.getProtocolVersion(SCMSecurityProtocolPB.class);
+
+    this.scmProxies = new HashMap<>();
+    this.scmProxyInfoMap = new HashMap<>();
+    loadConfigs();
+
+    this.currentProxyIndex = 0;
+    currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+    scmClientConfig = conf.getObject(SCMClientConfig.class);
+    this.maxRetryCount = scmClientConfig.getRetryCount();
+    this.retryInterval = scmClientConfig.getRetryInterval();
+  }
+
+  protected void loadConfigs() {
+    List<SCMNodeInfo> scmNodeInfoList = SCMNodeInfo.buildNodeInfo(conf);
+    scmNodeIds = new ArrayList<>();
+
+    for (SCMNodeInfo scmNodeInfo : scmNodeInfoList) {
+      if (scmNodeInfo.getScmSecurityAddress() == null) {
+        throw new ConfigurationException("SCM Client Address could not " +
+            "be obtained from config. Config is not properly defined");
+      } else {
+        InetSocketAddress scmSecurityAddress =
+            NetUtils.createSocketAddr(scmNodeInfo.getScmSecurityAddress());
+
+        scmServiceId = scmNodeInfo.getServiceId();
+        String scmNodeId = scmNodeInfo.getNodeId();
+
+        scmNodeIds.add(scmNodeId);
+        SCMProxyInfo scmProxyInfo = new SCMProxyInfo(scmServiceId, scmNodeId,
+            scmSecurityAddress);
+        scmProxyInfoMap.put(scmNodeId, scmProxyInfo);
+      }
+    }
+  }
+
+  @Override
+  public synchronized ProxyInfo<SCMSecurityProtocolPB> getProxy() {
+    ProxyInfo currentProxyInfo = scmProxies.get(getCurrentProxySCMNodeId());
+    if (currentProxyInfo == null) {
+      currentProxyInfo = createSCMProxy(getCurrentProxySCMNodeId());
+    }
+    return currentProxyInfo;
+  }
+
+  /**
+   * Creates proxy object.
+   */
+  private ProxyInfo createSCMProxy(String nodeId) {
+    ProxyInfo proxyInfo;
+    SCMProxyInfo scmProxyInfo = scmProxyInfoMap.get(nodeId);
+    InetSocketAddress address = scmProxyInfo.getAddress();
+    try {
+      SCMSecurityProtocolPB scmProxy = createSCMProxy(address);
+      // Create proxyInfo here, to make it work with all Hadoop versions.
+      proxyInfo = new ProxyInfo<>(scmProxy, scmProxyInfo.toString());
+      scmProxies.put(nodeId, proxyInfo);
+      return proxyInfo;
+    } catch (IOException ioe) {
+      LOG.error("{} Failed to create RPC proxy to SCM at {}",
+          this.getClass().getSimpleName(), address, ioe);
+      throw new RuntimeException(ioe);
+    }
+  }
+
+  private SCMSecurityProtocolPB createSCMProxy(InetSocketAddress scmAddress)
+      throws IOException {
+    Configuration hadoopConf =
+        LegacyHadoopConfigurationSource.asHadoopConfiguration(conf);
+    RPC.setProtocolEngine(hadoopConf, SCMSecurityProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    // FailoverOnNetworkException ensures that the IPC layer does not attempt
+    // retries on the same SCM in case of connection exception. This retry
+    // policy essentially results in TRY_ONCE_THEN_FAIL.
+
+    RetryPolicy connectionRetryPolicy = RetryPolicies
+        .failoverOnNetworkException(0);
+
+    return RPC.getProtocolProxy(SCMSecurityProtocolPB.class,
+        scmVersion, scmAddress, ugi,
+        hadoopConf, NetUtils.getDefaultSocketFactory(hadoopConf),
+        (int)scmClientConfig.getRpcTimeOut(), 
connectionRetryPolicy).getProxy();
+  }
+
+
+  @Override
+  public void performFailover(SCMSecurityProtocolPB currentProxy) {
+    if (LOG.isDebugEnabled()) {
+      int currentIndex = getCurrentProxyIndex();
+      LOG.debug("Failing over SCM Security proxy to index: {}, nodeId: {}",
+          currentIndex, scmNodeIds.get(currentIndex));
+    }
+  }
+
+  /**
+   * Performs fail-over to the next proxy.
+   */
+  public void performFailoverToNextProxy() {
+    int newProxyIndex = incrementProxyIndex();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Incrementing SCM Security proxy index to {}, nodeId: {}",
+          newProxyIndex, scmNodeIds.get(newProxyIndex));
+    }
+  }
+
+  /**
+   * Update the proxy index to the next proxy in the list.
+   * @return the new proxy index
+   */
+  private synchronized int incrementProxyIndex() {
+    currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size();
+    currentProxySCMNodeId = scmNodeIds.get(currentProxyIndex);
+    return currentProxyIndex;
+  }
+
+  public RetryPolicy getRetryPolicy() {
+    // Client will attempt up to maxFailovers number of failovers between
+    // available SCMs before throwing exception.
+    RetryPolicy retryPolicy = new RetryPolicy() {
+      @Override
+      public RetryAction shouldRetry(Exception exception, int retries,
+          int failovers, boolean isIdempotentOrAtMostOnce)
+          throws Exception {
+
+        if (LOG.isDebugEnabled()) {
+          if (exception.getCause() != null) {
+            LOG.debug("RetryProxy: SCM Security Server {}: {}: {}",
+                getCurrentProxySCMNodeId(),
+                exception.getCause().getClass().getSimpleName(),
+                exception.getCause().getMessage());
+          } else {
+            LOG.debug("RetryProxy: SCM {}: {}", getCurrentProxySCMNodeId(),
+                exception.getMessage());
+          }
+        }
+
+        // For AccessControl Exception where Client is not authentica
+        if (HAUtils.isAccessControlException(exception)) {
+          return RetryAction.FAIL;
+        }
+
+        // Perform fail over to next proxy, as right now we don't have any
+        // suggested leader ID from server, we fail over to next one.
+        // TODO: Act based on server response if leader id is passed.
+        performFailoverToNextProxy();
+        return getRetryAction(FAILOVER_AND_RETRY, failovers);
+      }
+
+      private RetryAction getRetryAction(RetryDecision fallbackAction,
+          int failovers) {
+        if (failovers < maxRetryCount) {
+          return new RetryAction(fallbackAction, getRetryInterval());
+        } else {
+          return RetryAction.FAIL;
+        }
+      }
+    };
+
+    return retryPolicy;
+  }
+
+
+  @Override
+  public Class< SCMSecurityProtocolPB > getInterface() {
+    return SCMSecurityProtocolPB.class;
+  }
+
+  @Override
+  public void close() throws IOException {
+    for (Map.Entry<String, ProxyInfo<SCMSecurityProtocolPB>> proxy :
+        scmProxies.entrySet()) {
+      if (proxy.getValue() != null) {
+        RPC.stopProxy(proxy.getValue());
+      }
+      scmProxies.remove(proxy.getKey());
+    }
+  }
+
+  public synchronized String getCurrentProxySCMNodeId() {
+    return currentProxySCMNodeId;
+  }
+
+  public synchronized int getCurrentProxyIndex() {
+    return currentProxyIndex;
+  }
+
+  private long getRetryInterval() {
+    return retryInterval;
+  }
+}
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
index 440bd4c..2cd8993 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/authority/DefaultCAServer.java
@@ -63,6 +63,7 @@ import java.util.concurrent.Future;
 import java.util.function.Consumer;
 
 import static 
org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getCertificationRequest;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.UNABLE_TO_ISSUE_CERTIFICATE;
 import static 
org.apache.hadoop.hdds.security.x509.exceptions.CertificateException.ErrorCode.CSR_ERROR;
 
 /**
@@ -254,7 +255,8 @@ public class DefaultCAServer implements CertificateServer {
       }
     } catch (CertificateException | IOException | OperatorCreationException e) 
{
       LOG.error("Unable to issue a certificate.", e);
-      xcertHolder.completeExceptionally(new SCMSecurityException(e));
+      xcertHolder.completeExceptionally(
+          new SCMSecurityException(e, UNABLE_TO_ISSUE_CERTIFICATE));
     }
     return xcertHolder;
   }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
index b26ad2c..b8d2859 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/security/x509/certificates/utils/CertificateSignRequest.java
@@ -59,6 +59,8 @@ import 
org.bouncycastle.pkcs.jcajce.JcaPKCS10CertificationRequestBuilder;
 import org.bouncycastle.util.io.pem.PemObject;
 import org.bouncycastle.util.io.pem.PemReader;
 
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.INVALID_CSR;
+
 /**
  * A certificate sign request object that wraps operations to build a
  * PKCS10CertificationRequest to CertificateServer.
@@ -134,7 +136,8 @@ public final class CertificateSignRequest {
     try (PemReader reader = new PemReader(new StringReader(csr))) {
       PemObject pemObject = reader.readPemObject();
       if(pemObject.getContent() == null) {
-        throw new SCMSecurityException("Invalid Certificate signing request");
+        throw new SCMSecurityException("Invalid Certificate signing request",
+            INVALID_CSR);
       }
       return new PKCS10CertificationRequest(pemObject.getContent());
     }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
index 4632b36..36d6bab 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HAUtils.java
@@ -17,6 +17,7 @@
 package org.apache.hadoop.hdds.utils;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
+import com.google.protobuf.ServiceException;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.AddSCMRequest;
 import org.apache.hadoop.hdds.scm.ScmInfo;
@@ -35,6 +36,8 @@ import org.apache.ratis.util.ExitUtils;
 import org.apache.ratis.util.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.AccessControlException;
 
 import java.io.File;
 import java.io.IOException;
@@ -285,4 +288,24 @@ public final class HAUtils {
     }
     return metadataDir;
   }
+
+  /**
+   * Unwrap exception to check if it is some kind of access control problem.
+   * {@link AccessControlException}
+   */
+  public static boolean isAccessControlException(Exception ex) {
+    if (ex instanceof ServiceException) {
+      Throwable t = ex.getCause();
+      if (t instanceof RemoteException) {
+        t = ((RemoteException) t).unwrapRemoteException();
+      }
+      while (t != null) {
+        if (t instanceof AccessControlException) {
+          return true;
+        }
+        t = t.getCause();
+      }
+    }
+    return false;
+  }
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
index 9e6ef22..ddc7e04 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/HddsServerUtil.java
@@ -46,16 +46,13 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
 import 
org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
 import org.apache.hadoop.hdds.recon.ReconConfigKeys;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
-import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import 
org.apache.hadoop.hdds.scm.proxy.SCMSecurityProtocolFailoverProxyProvider;
 import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdds.utils.db.DBCheckpoint;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.metrics2.MetricsException;
@@ -435,20 +432,9 @@ public final class HddsServerUtil {
    */
   public static SCMSecurityProtocolClientSideTranslatorPB getScmSecurityClient(
       OzoneConfiguration conf) throws IOException {
-    RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
-        ProtobufRpcEngine.class);
-    long scmVersion =
-        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
-    InetSocketAddress address =
-        getScmAddressForSecurityProtocol(conf);
-    RetryPolicy retryPolicy =
-        RetryPolicies.retryForeverWithFixedSleep(
-            1000, TimeUnit.MILLISECONDS);
     return new SCMSecurityProtocolClientSideTranslatorPB(
-        RPC.getProtocolProxy(SCMSecurityProtocolPB.class, scmVersion,
-            address, UserGroupInformation.getCurrentUser(),
-            conf, NetUtils.getDefaultSocketFactory(conf),
-            Client.getRpcTimeout(conf), retryPolicy).getProxy());
+        new SCMSecurityProtocolFailoverProxyProvider(conf,
+            UserGroupInformation.getCurrentUser()));
   }
 
 
@@ -489,17 +475,11 @@ public final class HddsServerUtil {
    */
   public static SCMSecurityProtocol getScmSecurityClient(
       OzoneConfiguration conf, UserGroupInformation ugi) throws IOException {
-    RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
-        ProtobufRpcEngine.class);
-    long scmVersion =
-        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
-    InetSocketAddress scmSecurityProtoAdd =
-        getScmAddressForSecurityProtocol(conf);
-    return new SCMSecurityProtocolClientSideTranslatorPB(
-        RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
-            scmSecurityProtoAdd, ugi, conf,
-            NetUtils.getDefaultSocketFactory(conf),
-            Client.getRpcTimeout(conf)));
+    SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
+        new SCMSecurityProtocolClientSideTranslatorPB(
+            new SCMSecurityProtocolFailoverProxyProvider(conf, ugi));
+    return TracingUtil.createProxy(scmSecurityClient,
+        SCMSecurityProtocol.class, conf);
   }
 
   /**
diff --git 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
index 0455952..48c6cf9 100644
--- 
a/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
+++ 
b/hadoop-hdds/interface-server/src/main/proto/ScmServerSecurityProtocol.proto
@@ -83,6 +83,19 @@ enum Type {
 
 enum Status {
     OK = 1;
+    INVALID_CSR = 2;
+    UNABLE_TO_ISSUE_CERTIFICATE = 3;
+    GET_DN_CERTIFICATE_FAILED = 4;
+    GET_OM_CERTIFICATE_FAILED = 5;
+    GET_SCM_CERTIFICATE_FAILED = 6;
+    GET_CERTIFICATE_FAILED = 7;
+    GET_CA_CERT_FAILED = 8;
+    CERTIFICATE_NOT_FOUND = 9;
+    PEM_ENCODE_FAILED = 10;
+    INTERNAL_ERROR = 11;
+    DEFAULT = 12;
+    MISSING_BLOCK_TOKEN = 13;
+    BLOCK_TOKEN_VERIFICATION_FAILED = 14;
 }
 /**
 * This message is send by data node to prove its identity and get an SCM
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
index babc87b..06da6e4 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/SCMSecurityProtocolServerSideTranslatorPB.java
@@ -33,6 +33,8 @@ import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecuri
 import 
org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.SCMSecurityResponse;
 import org.apache.hadoop.hdds.protocol.proto.SCMSecurityProtocolProtos.Status;
 import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
 import org.apache.hadoop.hdds.utils.ProtocolMessageMetrics;
 
@@ -54,14 +56,17 @@ public class SCMSecurityProtocolServerSideTranslatorPB
       LoggerFactory.getLogger(SCMSecurityProtocolServerSideTranslatorPB.class);
 
   private final SCMSecurityProtocol impl;
+  private final StorageContainerManager scm;
 
   private OzoneProtocolMessageDispatcher<SCMSecurityRequest,
       SCMSecurityResponse, ProtocolMessageEnum>
       dispatcher;
 
   public SCMSecurityProtocolServerSideTranslatorPB(SCMSecurityProtocol impl,
+      StorageContainerManager storageContainerManager,
       ProtocolMessageMetrics messageMetrics) {
     this.impl = impl;
+    this.scm = storageContainerManager;
     this.dispatcher =
         new OzoneProtocolMessageDispatcher<>("ScmSecurityProtocol",
             messageMetrics, LOG);
@@ -70,62 +75,73 @@ public class SCMSecurityProtocolServerSideTranslatorPB
   @Override
   public SCMSecurityResponse submitRequest(RpcController controller,
       SCMSecurityRequest request) throws ServiceException {
+    if (!scm.checkLeader()) {
+      throw new ServiceException(scm.getScmHAManager()
+          .getRatisServer()
+          .triggerNotLeaderException());
+    }
     return dispatcher.processRequest(request, this::processRequest,
         request.getCmdType(), request.getTraceID());
   }
 
-  public SCMSecurityResponse processRequest(SCMSecurityRequest request)
-      throws ServiceException {
+  public SCMSecurityResponse processRequest(SCMSecurityRequest request) {
+    SCMSecurityResponse.Builder scmSecurityResponse =
+        SCMSecurityResponse.newBuilder().setCmdType(request.getCmdType())
+        .setStatus(Status.OK);
     try {
       switch (request.getCmdType()) {
       case GetCertificate:
-        return SCMSecurityResponse.newBuilder()
-            .setCmdType(request.getCmdType())
-            .setStatus(Status.OK)
-            .setGetCertResponseProto(
-                getCertificate(request.getGetCertificateRequest()))
-            .build();
+        return scmSecurityResponse.setGetCertResponseProto(
+            getCertificate(request.getGetCertificateRequest())).build();
       case GetCACertificate:
-        return SCMSecurityResponse.newBuilder()
-            .setCmdType(request.getCmdType())
-            .setStatus(Status.OK)
-            .setGetCertResponseProto(
-                getCACertificate(request.getGetCACertificateRequest()))
-            .build();
+        return scmSecurityResponse.setGetCertResponseProto(
+            getCACertificate(request.getGetCACertificateRequest())).build();
       case GetOMCertificate:
-        return SCMSecurityResponse.newBuilder()
-            .setCmdType(request.getCmdType())
-            .setStatus(Status.OK)
-            .setGetCertResponseProto(
-                getOMCertificate(request.getGetOMCertRequest()))
+        return scmSecurityResponse.setGetCertResponseProto(
+            getOMCertificate(request.getGetOMCertRequest()))
             .build();
       case GetDataNodeCertificate:
-        return SCMSecurityResponse.newBuilder()
-            .setCmdType(request.getCmdType())
-            .setStatus(Status.OK)
-            .setGetCertResponseProto(
-                getDataNodeCertificate(request.getGetDataNodeCertRequest()))
+        return scmSecurityResponse.setGetCertResponseProto(
+            getDataNodeCertificate(request.getGetDataNodeCertRequest()))
             .build();
       case ListCertificate:
-        return SCMSecurityResponse.newBuilder()
-            .setCmdType(request.getCmdType())
-            .setStatus(Status.OK)
-            .setListCertificateResponseProto(
-                listCertificate(request.getListCertificateRequest()))
+        return scmSecurityResponse.setListCertificateResponseProto(
+            listCertificate(request.getListCertificateRequest()))
             .build();
       case GetSCMCertificate:
-        return SCMSecurityResponse.newBuilder()
-            .setCmdType(request.getCmdType())
-            .setStatus(Status.OK)
-            .setGetCertResponseProto(getSCMCertificate(
-                request.getGetSCMCertificateRequest()))
-            .build();
+        return scmSecurityResponse.setGetCertResponseProto(getSCMCertificate(
+            request.getGetSCMCertificateRequest())).build();
       default:
         throw new IllegalArgumentException(
             "Unknown request type: " + request.getCmdType());
       }
     } catch (IOException e) {
-      throw new ServiceException(e);
+      scmSecurityResponse.setSuccess(false);
+      scmSecurityResponse.setStatus(exceptionToResponseStatus(e));
+      // If actual cause is set in SCMSecurityException, set message with
+      // actual cause message.
+      if (e.getMessage() != null) {
+        scmSecurityResponse.setMessage(e.getMessage());
+      } else {
+        if (e.getCause() != null && e.getCause().getMessage() != null) {
+          scmSecurityResponse.setMessage(e.getCause().getMessage());
+        }
+      }
+      return scmSecurityResponse.build();
+    }
+  }
+
+  /**
+   * Convert exception to corresponsing status.
+   * @param ex
+   * @return SCMSecurityProtocolProtos.Status code of the error.
+   */
+  private Status exceptionToResponseStatus(IOException ex) {
+    if (ex instanceof SCMSecurityException) {
+      return Status.values()[
+          ((SCMSecurityException) ex).getErrorCode().ordinal()];
+    } else {
+      return Status.INTERNAL_ERROR;
     }
   }
 
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
index 3f3b360..5df3aa7 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMSecurityProtocolServer.java
@@ -54,6 +54,9 @@ import org.bouncycastle.cert.X509CertificateHolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.CERTIFICATE_NOT_FOUND;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CA_CERT_FAILED;
+import static 
org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.GET_CERTIFICATE_FAILED;
 import static 
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateApprover.ApprovalType.KERBEROS_TRUSTED;
 
 /**
@@ -91,7 +94,8 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
     BlockingService secureProtoPbService =
         SCMSecurityProtocolProtos.SCMSecurityProtocolService
             .newReflectiveBlockingService(
-                new SCMSecurityProtocolServerSideTranslatorPB(this, metrics));
+                new SCMSecurityProtocolServerSideTranslatorPB(this,
+                    scm, metrics));
     this.rpcServer =
         StorageContainerManager.startRpcServer(
             conf,
@@ -181,14 +185,34 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
       return CertificateCodec.getPEMEncodedString(future.get());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
-      throw new IOException("generate" + nodeType.toString() + "Certificate " +
-          "operation failed. ", e);
+      throw generateException(e, nodeType);
     } catch (ExecutionException e) {
-      throw new IOException("generate" + nodeType.toString() + "Certificate " +
-          "operation failed.", e);
+      if (e.getCause() != null) {
+        if (e.getCause() instanceof SCMSecurityException) {
+          throw (SCMSecurityException) e.getCause();
+        } else {
+          throw generateException(e, nodeType);
+        }
+      } else {
+        throw generateException(e, nodeType);
+      }
     }
   }
 
+  private SCMSecurityException generateException(Exception ex, NodeType role) {
+    SCMSecurityException.ErrorCode errorCode;
+    if (role == NodeType.SCM) {
+      errorCode = SCMSecurityException.ErrorCode.GET_SCM_CERTIFICATE_FAILED;
+    } else if (role == NodeType.OM) {
+      errorCode = SCMSecurityException.ErrorCode.GET_OM_CERTIFICATE_FAILED;
+    } else {
+      errorCode = SCMSecurityException.ErrorCode.GET_DN_CERTIFICATE_FAILED;
+    }
+    return new SCMSecurityException("generate " + role.toString() +
+        " Certificate operation failed", ex, errorCode);
+
+  }
+
   /**
    * Get SCM signed certificate with given serial id.
    *
@@ -206,10 +230,12 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
         return CertificateCodec.getPEMEncodedString(certificate);
       }
     } catch (CertificateException e) {
-      throw new IOException("getCertificate operation failed. ", e);
+      throw new SCMSecurityException("getCertificate operation failed. ", e,
+          GET_CERTIFICATE_FAILED);
     }
     LOGGER.debug("Certificate with serial id {} not found.", certSerialId);
-    throw new IOException("Certificate not found");
+    throw new SCMSecurityException("Certificate not found",
+        CERTIFICATE_NOT_FOUND);
   }
 
   /**
@@ -224,7 +250,8 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
       return CertificateCodec.getPEMEncodedString(
           certificateServer.getCACertificate());
     } catch (CertificateException e) {
-      throw new IOException("getRootCertificate operation failed. ", e);
+      throw new SCMSecurityException("getRootCertificate operation failed. ",
+          e, GET_CA_CERT_FAILED);
     }
   }
 
@@ -249,7 +276,8 @@ public class SCMSecurityProtocolServer implements 
SCMSecurityProtocol {
         String certStr = CertificateCodec.getPEMEncodedString(cert);
         results.add(certStr);
       } catch (SCMSecurityException e) {
-        throw new IOException("listCertificate operation failed. ", e);
+        throw new SCMSecurityException("listCertificate operation failed.",
+            e, e.getErrorCode());
       }
     }
     return results;
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 65c70b8..f0adadd 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMRatisServerImpl;
 import org.apache.hadoop.hdds.scm.server.SCMHTTPServerConfig;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
@@ -52,7 +53,6 @@ import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
 import org.apache.hadoop.hdds.utils.HddsServerUtil;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.minikdc.MiniKdc;
@@ -302,7 +302,8 @@ public final class TestSecureOzoneCluster {
       assertNotNull(scmSecurityProtocolClient);
       String caCert = scmSecurityProtocolClient.getCACertificate();
       assertNotNull(caCert);
-      LambdaTestUtils.intercept(RemoteException.class, "Certificate not found",
+      LambdaTestUtils.intercept(SCMSecurityException.class,
+          "Certificate not found",
           () -> scmSecurityProtocolClient.getCertificate("1"));
 
       // Case 2: User without Kerberos credentials should fail.


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to