This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch HDDS-4440-s3-performance
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 76e238118de06bfa6f4c997b92ae57baa79f5a1b
Author: Neil Joshi <[email protected]>
AuthorDate: Wed Mar 9 12:19:01 2022 -0700

    HDDS-5544. Update GRPC OmTransport implementation for HA (#2901)
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  20 +++
 .../java/org/apache/hadoop/hdds/TestHddsUtils.java |  39 +++-
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +-
 .../ozone/om/ha/GrpcOMFailoverProxyProvider.java   | 143 +++++++++++++++
 .../ozone/om/ha/OMFailoverProxyProvider.java       |  22 +--
 .../ozone/om/protocolPB/GrpcOmTransport.java       | 196 +++++++++++++++++----
 .../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 119 +++++++++++--
 .../src/main/compose/ozone-om-ha/docker-config     |   1 +
 .../src/main/compose/ozonesecure-ha/docker-config  |   1 +
 .../dist/src/main/compose/ozonesecure-ha/test.sh   |   2 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   3 +-
 .../hadoop/ozone/om/GrpcOzoneManagerServer.java    |  20 ++-
 .../hadoop/ozone/om/OzoneManagerServiceGrpc.java   |  43 +----
 .../hadoop/ozone/om/failover/TestOMFailovers.java  |   2 +-
 14 files changed, 516 insertions(+), 98 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index ffbb3e3340..364377d396 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -227,6 +227,26 @@ public final class HddsUtils {
     }
   }
 
+  /**
+   * Retrieve a number, trying the supplied config keys in order.
+   * Each config value may be absent
+   *
+   * @param conf Conf
+   * @param keys a list of configuration key names.
+   *
+   * @return first number found from the given keys, or absent.
+   */
+  public static OptionalInt getNumberFromConfigKeys(
+      ConfigurationSource conf, String... keys) {
+    for (final String key : keys) {
+      final String value = conf.getTrimmed(key);
+      if (value != null) {
+        return OptionalInt.of(Integer.parseInt(value));
+      }
+    }
+    return OptionalInt.empty();
+  }
+
   /**
    * Retrieve the port number, trying the supplied config keys in order.
    * Each config value may be absent, or if present in the format
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
index fd8aa28e63..67001010d5 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
@@ -36,6 +36,8 @@ import static 
org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
+
 import static org.hamcrest.core.Is.is;
 import org.junit.Assert;
 import static org.junit.Assert.assertThat;
@@ -216,4 +218,39 @@ public class TestHddsUtils {
 
   }
 
-}
\ No newline at end of file
+  @Test
+  public void testGetNumberFromConfigKeys() {
+    final String testnum1 = "8";
+    final String testnum2 = "7";
+    final String serviceId = "id1";
+    final String nodeId = "scm1";
+
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
+        testnum1);
+    Assert.assertTrue(Integer.parseInt(testnum1) ==
+        HddsUtils.getNumberFromConfigKeys(
+            conf,
+            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+    /* Test to return first unempty key number from list */
+    /* first key is absent */
+    Assert.assertTrue(Integer.parseInt(testnum1) ==
+        HddsUtils.getNumberFromConfigKeys(
+            conf,
+            ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+                serviceId, nodeId),
+            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+
+    /* now set the empty key and ensure returned value from this key */
+    conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+            serviceId, nodeId),
+        testnum2);
+    Assert.assertTrue(Integer.parseInt(testnum2) ==
+        HddsUtils.getNumberFromConfigKeys(
+            conf,
+            ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
+                serviceId, nodeId),
+            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index cdd9e52667..6ebd7e11ad 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -57,7 +57,8 @@ public final class OMConfigKeys {
   public static final String OZONE_OM_BIND_HOST_DEFAULT =
       "0.0.0.0";
   public static final int OZONE_OM_PORT_DEFAULT = 9862;
-
+  public static final String OZONE_OM_GRPC_PORT_KEY =
+      "ozone.om.grpc.port";
   public static final String OZONE_OM_HTTP_ENABLED_KEY =
       "ozone.om.http.enabled";
   public static final String OZONE_OM_HTTP_BIND_HOST_KEY =
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
new file mode 100644
index 0000000000..498f935974
--- /dev/null
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.ha;
+
+import org.apache.hadoop.hdds.conf.ConfigurationException;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
+import org.apache.hadoop.security.UserGroupInformation;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalInt;
+
+import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+
+/**
+ * The Grpc s3gateway om transport failover proxy provider implementation
+ * extending the ozone client OM failover proxy provider.  This implmentation
+ * allows the Grpc OMTransport reuse OM failover retry policies and
+ * getRetryAction methods.  In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class GrpcOMFailoverProxyProvider<T> extends
+    OMFailoverProxyProvider<T> {
+
+  private Map<String, String> omAddresses;
+
+  public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
+                                     UserGroupInformation ugi,
+                                     String omServiceId,
+                                     Class<T> protocol) throws IOException {
+    super(configuration, ugi, omServiceId, protocol);
+  }
+
+  @Override
+  protected void loadOMClientConfigs(ConfigurationSource config, String 
omSvcId)
+      throws IOException {
+    // to be used for base class omProxies,
+    // ProxyInfo not applicable for gRPC, just need key set
+    Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
+    // to be used for base class omProxyInfos
+    // OMProxyInfo not applicable for gRPC, just need key set
+    Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
+    List<String> omNodeIDList = new ArrayList<>();
+    omAddresses = new HashMap<>();
+
+    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
+
+    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+      String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+          omSvcId, nodeId);
+
+      Optional<String> hostaddr = getHostNameFromConfigKeys(config,
+          rpcAddrKey);
+
+      OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
+          ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+              omSvcId, nodeId),
+          OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+      if (nodeId == null) {
+        nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
+      }
+      omProxiesNodeIdKeyset.put(nodeId, null);
+      omProxyInfosNodeIdKeyset.put(nodeId, null);
+      if (hostaddr.isPresent()) {
+        omAddresses.put(nodeId,
+            hostaddr.get() + ":"
+                + hostport.orElse(config
+                .getObject(GrpcOmTransport
+                    .GrpcOmTransportConfig.class)
+                .getPort()));
+      } else {
+        LOG.error("expected host address not defined for: {}", rpcAddrKey);
+        throw new ConfigurationException(rpcAddrKey + "is not defined");
+      }
+      omNodeIDList.add(nodeId);
+    }
+
+    if (omProxiesNodeIdKeyset.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for OM. Please configure the system with "
+          + OZONE_OM_ADDRESS_KEY);
+    }
+
+    // set base class omProxies, omProxyInfos, omNodeIDList
+
+    // omProxies needed in base class
+    // omProxies.size == number of om nodes
+    // omProxies key needs to be valid nodeid
+    // omProxyInfos keyset needed in base class
+    setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
+  }
+
+  @Override
+  protected Text computeDelegationTokenService() {
+    return new Text();
+  }
+
+  // need to throw if nodeID not in omAddresses
+  public String getGrpcProxyAddress(String nodeId) throws IOException {
+    if (omAddresses.containsKey(nodeId)) {
+      return omAddresses.get(nodeId);
+    } else {
+      LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
+          nodeId);
+      throw new IOException(
+          "expected nodeId not found in omAddresses for proxyhost");
+    }
+
+  }
+
+  public List<String> getGrpcOmNodeIDList() {
+    return getOmNodeIDList();
+  }
+}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 5432468452..9fb690e760 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -148,8 +148,6 @@ public class OMFailoverProxyProvider<T> implements
             rpcAddrStr);
 
         if (omProxyInfo.getAddress() != null) {
-
-
           // For a non-HA OM setup, nodeId might be null. If so, we assign it
           // the default value
           if (nodeId == null) {
@@ -551,14 +549,18 @@ public class OMFailoverProxyProvider<T> implements
     return null;
   }
 
-  @VisibleForTesting
-  protected void setProxiesForTesting(
-      Map<String, ProxyInfo<T>> testOMProxies,
-      Map<String, OMProxyInfo> testOMProxyInfos,
-      List<String> testOMNodeIDList) {
-    this.omProxies = testOMProxies;
-    this.omProxyInfos = testOMProxyInfos;
-    this.omNodeIDList = testOMNodeIDList;
+  protected void setProxies(
+      Map<String, ProxyInfo<T>> setOMProxies,
+      Map<String, OMProxyInfo> setOMProxyInfos,
+      List<String> setOMNodeIDList) {
+    this.omProxies = setOMProxies;
+    this.omProxyInfos = setOMProxyInfos;
+    this.omNodeIDList = setOMNodeIDList;
   }
+
+  protected List<String> getOmNodeIDList() {
+    return omNodeIDList;
+  }
+
 }
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index 3607429e52..72c29f0cc6 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -18,22 +18,34 @@
 package org.apache.hadoop.ozone.om.protocolPB;
 
 import java.io.IOException;
-import java.util.Optional;
+import java.lang.reflect.Constructor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 
+import com.google.common.net.HostAndPort;
 import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import org.apache.hadoop.ipc.RemoteException;
+
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.security.UserGroupInformation;
 
+import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
@@ -42,12 +54,10 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
-import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 
 /**
  * Grpc transport for grpc between s3g and om.
@@ -60,60 +70,171 @@ public class GrpcOmTransport implements OmTransport {
   private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
   // gRPC specific
-  private ManagedChannel channel;
-
   private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
+  private Map<String,
+      OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
+  private Map<String, ManagedChannel> channels;
+  private int lastVisited = -1;
+  private ConfigurationSource conf;
 
-  private String host = "om";
-  private int port = 8981;
+  //private String host = "om";
+  private AtomicReference<String> host;
   private int maxSize;
 
+  private List<String> oms;
+  private RetryPolicy retryPolicy;
+  private int failoverCount = 0;
+  private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
+      omFailoverProxyProvider;
+
   public GrpcOmTransport(ConfigurationSource conf,
                           UserGroupInformation ugi, String omServiceId)
       throws IOException {
-    Optional<String> omHost = getHostNameFromConfigKeys(conf,
-        OZONE_OM_ADDRESS_KEY);
-    this.host = omHost.orElse("0.0.0.0");
 
-    port = conf.getObject(GrpcOmTransportConfig.class).getPort();
+    this.channels = new HashMap<>();
+    this.clients = new HashMap<>();
+    this.conf = conf;
+    this.host = new AtomicReference();
 
     maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
         OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
 
+    omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
+        conf,
+        ugi,
+        omServiceId,
+        OzoneManagerProtocolPB.class);
+
     start();
   }
 
-  public void start() {
+  public void start() throws IOException {
+    host.set(omFailoverProxyProvider
+        .getGrpcProxyAddress(
+            omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+
     if (!isRunning.compareAndSet(false, true)) {
       LOG.info("Ignore. already started.");
       return;
     }
-    NettyChannelBuilder channelBuilder =
-        NettyChannelBuilder.forAddress(host, port)
-            .usePlaintext()
-            .maxInboundMessageSize(maxSize);
 
-    channel = channelBuilder.build();
-    client = OzoneManagerServiceGrpc.newBlockingStub(channel);
+    List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+    for (String nodeId : nodes) {
+      String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+      HostAndPort hp = HostAndPort.fromString(hostaddr);
+
+      NettyChannelBuilder channelBuilder =
+          NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort())
+              .usePlaintext()
+              .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
+      channels.put(hostaddr, channelBuilder.build());
+      clients.put(hostaddr,
+          OzoneManagerServiceGrpc
+              .newBlockingStub(channels.get(hostaddr)));
+    }
+    int maxFailovers = conf.getInt(
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
 
+
+    retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers);
     LOG.info("{}: started", CLIENT_NAME);
   }
 
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     OMResponse resp = null;
-    try {
-      resp = client.submitRequest(payload);
-    } catch (io.grpc.StatusRuntimeException e) {
-      ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
-      if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
-        resultCode = ResultCodes.TIMEOUT;
+    boolean tryOtherHost = true;
+    ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
+    while (tryOtherHost) {
+      tryOtherHost = false;
+      try {
+        resp = clients.get(host.get()).submitRequest(payload);
+      } catch (StatusRuntimeException e) {
+        if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
+          resultCode = ResultCodes.TIMEOUT;
+        }
+        Exception exp = new Exception(e);
+        tryOtherHost = shouldRetry(unwrapException(exp));
+        if (!tryOtherHost) {
+          throw new OMException(resultCode);
+        }
       }
-      throw new OMException(e.getCause(), resultCode);
     }
     return resp;
   }
 
+  private Exception unwrapException(Exception ex) {
+    Exception grpcException = null;
+    try {
+      StatusRuntimeException srexp =
+          (StatusRuntimeException)ex.getCause();
+      Status status = srexp.getStatus();
+      LOG.debug("GRPC exception wrapped: {}", status.getDescription());
+      if (status.getCode() == Status.Code.INTERNAL) {
+        // exception potentially generated by OzoneManagerServiceGrpc
+        Class<?> realClass = Class.forName(status.getDescription()
+            .substring(0, status.getDescription()
+                .indexOf(":")));
+        Class<? extends Exception> cls = realClass
+            .asSubclass(Exception.class);
+        Constructor<? extends Exception> cn = cls.getConstructor(String.class);
+        cn.setAccessible(true);
+        grpcException = cn.newInstance(status.getDescription());
+        IOException remote = null;
+        try {
+          String cause = status.getDescription();
+          cause = cause.substring(cause.indexOf(":") + 2);
+          remote = new RemoteException(cause.substring(0, cause.indexOf(":")),
+              cause.substring(cause.indexOf(":") + 1));
+          grpcException.initCause(remote);
+        } catch (Exception e) {
+          LOG.error("cannot get cause for remote exception");
+        }
+      } else {
+        // exception generated by connection failure, gRPC
+        grpcException = ex;
+      }
+    } catch (Exception e) {
+      grpcException = new IOException(e);
+      LOG.error("error unwrapping exception from OMResponse {}");
+    }
+    return grpcException;
+  }
+
+  private boolean shouldRetry(Exception ex) {
+    boolean retry = false;
+    RetryPolicy.RetryAction action = null;
+    try {
+      action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, 
true);
+      LOG.debug("grpc failover retry action {}", action.action);
+      if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
+        retry = false;
+        LOG.error("Retry request failed. " + action.reason, ex);
+      } else {
+        if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
+            (action.action == RetryPolicy.RetryAction.RetryDecision
+                .FAILOVER_AND_RETRY)) {
+          if (action.delayMillis > 0) {
+            try {
+              Thread.sleep(action.delayMillis);
+            } catch (Exception e) {
+              LOG.error("Error trying sleep thread for {}", 
action.delayMillis);
+            }
+          }
+          // switch om host to current proxy OMNodeId
+          host.set(omFailoverProxyProvider
+              .getGrpcProxyAddress(
+                  omFailoverProxyProvider.getCurrentProxyOMNodeId()));
+          retry = true;
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed failover exception {}", e);
+    }
+    return retry;
+  }
+
   // stub implementation for interface
   @Override
   public Text getDelegationTokenService() {
@@ -121,11 +242,15 @@ public class GrpcOmTransport implements OmTransport {
   }
 
   public void shutdown() {
-    channel.shutdown();
-    try {
-      channel.awaitTermination(5, TimeUnit.SECONDS);
-    } catch (Exception e) {
-      LOG.error("failed to shutdown OzoneManagerServiceGrpc channel", e);
+    for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) {
+      ManagedChannel channel = entry.getValue();
+      channel.shutdown();
+      try {
+        channel.awaitTermination(5, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}",
+            entry.getKey(), e);
+      }
     }
   }
 
@@ -156,9 +281,16 @@ public class GrpcOmTransport implements OmTransport {
   }
 
   @VisibleForTesting
-  public void startClient(ManagedChannel testChannel) {
-    client = OzoneManagerServiceGrpc.newBlockingStub(testChannel);
+  public void startClient(ManagedChannel testChannel) throws IOException {
+    List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
+    for (String nodeId : nodes) {
+      String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
 
+      clients.put(hostaddr,
+          OzoneManagerServiceGrpc
+              .newBlockingStub(testChannel));
+    }
     LOG.info("{}: started", CLIENT_NAME);
   }
+
 }
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index 323bb0eeb3..b427db5562 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -25,25 +25,29 @@ import static org.mockito.Mockito.mock;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.testing.GrpcCleanupRule;
+import io.grpc.ManagedChannel;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
 import org.apache.hadoop.security.UserGroupInformation;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
-import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import java.io.IOException;
 
-import io.grpc.ManagedChannel;
+import com.google.protobuf.ServiceException;
+import org.apache.ratis.protocol.RaftPeerId;
 
-import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
+import static org.junit.Assert.fail;
 
 /**
  * Tests for GrpcOmTransport client.
@@ -59,11 +63,32 @@ public class TestS3GrpcOmTransport {
 
   private final OMResponse omResponse = OMResponse.newBuilder()
                   .setSuccess(true)
-                  .setStatus(Status.OK)
+                  .setStatus(org.apache.hadoop.ozone.protocol
+                      .proto.OzoneManagerProtocolProtos.Status.OK)
                   .setLeaderOMNodeId(leaderOMNodeId)
                   .setCmdType(Type.AllocateBlock)
                   .build();
 
+  private boolean doFailover = false;
+
+  private OzoneConfiguration conf;
+
+  private String omServiceId;
+  private UserGroupInformation ugi;
+  private ManagedChannel channel;
+
+
+  private ServiceException createNotLeaderException() {
+    RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
+
+    // TODO: Set suggest leaderID. Right now, client is not using suggest
+    // leaderID. Need to fix this.
+    OMNotLeaderException notLeaderException =
+        new OMNotLeaderException(raftPeerId);
+    LOG.debug(notLeaderException.getMessage());
+    return new ServiceException(notLeaderException);
+  }
+
   private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
       serviceImpl =
         mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
@@ -78,10 +103,22 @@ public class TestS3GrpcOmTransport {
                                               .OzoneManagerProtocolProtos
                                               .OMResponse>
                                           responseObserver) {
-                  responseObserver.onNext(omResponse);
-                  responseObserver.onCompleted();
+                  try {
+                    if (doFailover) {
+                      doFailover = false;
+                      throw createNotLeaderException();
+                    } else {
+                      responseObserver.onNext(omResponse);
+                      responseObserver.onCompleted();
+                    }
+                  } catch (Throwable e) {
+                    IOException ex = new IOException(e.getCause());
+                    responseObserver.onError(io.grpc.Status
+                        .INTERNAL
+                        .withDescription(ex.getMessage())
+                        .asRuntimeException());
+                  }
                 }
-
               }));
 
   private GrpcOmTransport client;
@@ -101,18 +138,37 @@ public class TestS3GrpcOmTransport {
         .start());
 
     // Create a client channel and register for automatic graceful shutdown.
-    ManagedChannel channel = grpcCleanup.register(
+    channel = grpcCleanup.register(
         InProcessChannelBuilder.forName(serverName).directExecutor().build());
 
-    String omServiceId = "";
-    OzoneConfiguration conf = new OzoneConfiguration();
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+    omServiceId = "";
+    conf = new OzoneConfiguration();
+    ugi = UserGroupInformation.getCurrentUser();
+    doFailover = false;
+  }
+
+  @Test
+  public void testSubmitRequestToServer() throws Exception {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    final OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+
     client = new GrpcOmTransport(conf, ugi, omServiceId);
     client.startClient(channel);
+
+    final OMResponse resp = client.submitRequest(omRequest);
+    Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+        .proto.OzoneManagerProtocolProtos.Status.OK);
+    Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
   }
 
   @Test
-  public void testSubmitRequestToServer() throws Exception {
+  public void testGrpcFailoverProxy() throws Exception {
     ServiceListRequest req = ServiceListRequest.newBuilder().build();
 
     final OMRequest omRequest = OMRequest.newBuilder()
@@ -122,8 +178,45 @@ public class TestS3GrpcOmTransport {
         .setServiceListRequest(req)
         .build();
 
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    client.startClient(channel);
+
+    doFailover = true;
+    // first invocation generates a NotALeaderException
+    // failover is performed and request is internally retried
+    // second invocation request to server succeeds
     final OMResponse resp = client.submitRequest(omRequest);
-    Assert.assertEquals(resp.getStatus(), OK);
+    Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
+        .proto.OzoneManagerProtocolProtos.Status.OK);
     Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
   }
+
+  @Test
+  public void testGrpcFailoverProxyExhaustRetry() throws Exception {
+    ServiceListRequest req = ServiceListRequest.newBuilder().build();
+
+    final OMRequest omRequest = OMRequest.newBuilder()
+        .setCmdType(Type.ServiceList)
+        .setVersion(CURRENT_VERSION)
+        .setClientId("test")
+        .setServiceListRequest(req)
+        .build();
+
+    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0);
+    client = new GrpcOmTransport(conf, ugi, omServiceId);
+    client.startClient(channel);
+
+    doFailover = true;
+    // first invocation generates a NotALeaderException
+    // failover is performed and request is internally retried
+    // OMFailoverProvider returns Fail retry due to #attempts >
+    // max failovers
+
+    try {
+      final OMResponse resp = client.submitRequest(omRequest);
+      fail();
+    } catch (Exception e) {
+      Assert.assertTrue(true);
+    }
+  }
 }
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
index 69f4e52eae..4642680394 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
@@ -36,6 +36,7 @@ OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
 OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
 ASYNC_PROFILER_HOME=/opt/profiler
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index 498d02efae..be93d0a6ec 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -51,6 +51,7 @@ OZONE-SITE.XML_hdds.grpc.tls.enabled=true
 OZONE-SITE.XML_ozone.replication=3
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
+OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 
 OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
index 7410822cfa..252f953163 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
@@ -35,7 +35,7 @@ execute_robot_test ${SCM} freon
 
 execute_robot_test ${SCM} basic/links.robot
 
-#execute_robot_test ${SCM} s3
+execute_robot_test ${SCM} s3
 
 execute_robot_test ${SCM} admincli
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 3269c394f7..1c772cf46b 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -111,7 +111,8 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
         ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM,
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
         OMConfigKeys.OZONE_OM_HA_PREFIX,
-        OMConfigKeys.OZONE_OM_TRANSPORT_CLASS
+        OMConfigKeys.OZONE_OM_TRANSPORT_CLASS,
+        OMConfigKeys.OZONE_OM_GRPC_PORT_KEY
         // TODO HDDS-2856
     ));
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
index 60942f971b..7fe338c83e 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -18,13 +18,16 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
+import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.ha.ConfUtils;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
 import io.grpc.Server;
@@ -47,9 +50,20 @@ public class GrpcOzoneManagerServer {
                                     omTranslator,
                                 OzoneDelegationTokenSecretManager
                                     delegationTokenMgr) {
-    this.port = config.getObject(
-        GrpcOzoneManagerServerConfig.class).
-        getPort();
+    OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config,
+        ConfUtils.addKeySuffixes(
+            OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
+            config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY),
+            config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)),
+        OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
+    if (haPort.isPresent()) {
+      this.port = haPort.getAsInt();
+    } else {
+      this.port = config.getObject(
+              GrpcOzoneManagerServerConfig.class).
+          getPort();
+    }
+
     init(omTranslator,
         delegationTokenMgr,
         config);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
index de11608703..a88e259a28 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.ozone.om;
 
+import io.grpc.Status;
 import com.google.protobuf.RpcController;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ipc.ClientId;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
-import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.protocol.proto
@@ -68,7 +68,6 @@ public class OzoneManagerServiceGrpc extends 
OzoneManagerServiceImplBase {
         "processing s3g client submit request - for command {}",
         request.getCmdType().name());
     AtomicInteger callCount = new AtomicInteger(0);
-    OMResponse omResponse = null;
 
     org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
         callCount.incrementAndGet(),
@@ -84,42 +83,16 @@ public class OzoneManagerServiceGrpc extends 
OzoneManagerServiceImplBase {
     // for OMRequests.  Test through successful ratis-enabled OMRequest
     // handling without dependency on hadoop IPC based Server.
     try {
-      omResponse = this.omTranslator.
+      OMResponse omResponse = this.omTranslator.
           submitRequest(NULL_RPC_CONTROLLER, request);
+      responseObserver.onNext(omResponse);
     } catch (Throwable e) {
-      IOException ioe = null;
-      Throwable se = e.getCause();
-      if (se == null) {
-        ioe = new IOException(e);
-      } else {
-        ioe = se instanceof IOException ?
-            (IOException) se : new IOException(e);
-      }
-      omResponse = createErrorResponse(
-          request,
-          ioe);
+      IOException ex = new IOException(e.getCause());
+      responseObserver.onError(Status
+          .INTERNAL
+          .withDescription(ex.getMessage())
+          .asRuntimeException());
     }
-    responseObserver.onNext(omResponse);
     responseObserver.onCompleted();
   }
-
-  /**
-   * Create OMResponse from the specified OMRequest and exception.
-   *
-   * @param omRequest
-   * @param exception
-   * @return OMResponse
-   */
-  private OMResponse createErrorResponse(
-      OMRequest omRequest, IOException exception) {
-    OMResponse.Builder omResponse = OMResponse.newBuilder()
-        .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
-        .setCmdType(omRequest.getCmdType())
-        .setTraceID(omRequest.getTraceID())
-        .setSuccess(false);
-    if (exception.getMessage() != null) {
-      omResponse.setMessage(exception.getMessage());
-    }
-    return omResponse.build();
-  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index 01601668b6..fe7f6f49ea 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -143,7 +143,7 @@ public class TestOMFailovers {
         omProxyInfos.put(nodeId, null);
         omNodeIDList.add(nodeId);
       }
-      setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
+      setProxies(omProxies, omProxyInfos, omNodeIDList);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to