This is an automated email from the ASF dual-hosted git repository.

adoroszlai pushed a commit to branch HDDS-4440-s3-performance
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit d57616e165a6500e626d82159fefb95907db808d
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Mon Apr 11 19:12:00 2022 +0200

    Revert "HDDS-5544. Update GRPC OmTransport implementation for HA (#2901)"
    
    This reverts commit 413d4aade25e77b444fcbbea36a3302cd2a5dc66.
---
 .../java/org/apache/hadoop/hdds/HddsUtils.java     |  20 ---
 .../java/org/apache/hadoop/hdds/TestHddsUtils.java |  39 +---
 .../org/apache/hadoop/ozone/om/OMConfigKeys.java   |   3 +-
 .../ozone/om/ha/GrpcOMFailoverProxyProvider.java   | 143 ---------------
 .../ozone/om/ha/OMFailoverProxyProvider.java       |  22 ++-
 .../ozone/om/protocolPB/GrpcOmTransport.java       | 196 ++++-----------------
 .../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 119 ++-----------
 .../src/main/compose/ozone-om-ha/docker-config     |   1 -
 .../src/main/compose/ozonesecure-ha/docker-config  |   1 -
 .../dist/src/main/compose/ozonesecure-ha/test.sh   |   2 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |   3 +-
 .../hadoop/ozone/om/GrpcOzoneManagerServer.java    |  20 +--
 .../hadoop/ozone/om/OzoneManagerServiceGrpc.java   |  43 ++++-
 .../hadoop/ozone/om/failover/TestOMFailovers.java  |   2 +-
 14 files changed, 98 insertions(+), 516 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
index 364377d396..ffbb3e3340 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java
@@ -227,26 +227,6 @@ public final class HddsUtils {
     }
   }
 
-  /**
-   * Retrieve a number, trying the supplied config keys in order.
-   * Each config value may be absent
-   *
-   * @param conf Conf
-   * @param keys a list of configuration key names.
-   *
-   * @return first number found from the given keys, or absent.
-   */
-  public static OptionalInt getNumberFromConfigKeys(
-      ConfigurationSource conf, String... keys) {
-    for (final String key : keys) {
-      final String value = conf.getTrimmed(key);
-      if (value != null) {
-        return OptionalInt.of(Integer.parseInt(value));
-      }
-    }
-    return OptionalInt.empty();
-  }
-
   /**
    * Retrieve the port number, trying the supplied config keys in order.
    * Each config value may be absent, or if present in the format
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
index 67001010d5..fd8aa28e63 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java
@@ -36,8 +36,6 @@ import static 
org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT;
 import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY;
-import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-
 import static org.hamcrest.core.Is.is;
 import org.junit.Assert;
 import static org.junit.Assert.assertThat;
@@ -218,39 +216,4 @@ public class TestHddsUtils {
 
   }
 
-  @Test
-  public void testGetNumberFromConfigKeys() {
-    final String testnum1 = "8";
-    final String testnum2 = "7";
-    final String serviceId = "id1";
-    final String nodeId = "scm1";
-
-    OzoneConfiguration conf = new OzoneConfiguration();
-    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
-        testnum1);
-    Assert.assertTrue(Integer.parseInt(testnum1) ==
-        HddsUtils.getNumberFromConfigKeys(
-            conf,
-            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
-
-    /* Test to return first unempty key number from list */
-    /* first key is absent */
-    Assert.assertTrue(Integer.parseInt(testnum1) ==
-        HddsUtils.getNumberFromConfigKeys(
-            conf,
-            ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
-                serviceId, nodeId),
-            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
-
-    /* now set the empty key and ensure returned value from this key */
-    conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
-            serviceId, nodeId),
-        testnum2);
-    Assert.assertTrue(Integer.parseInt(testnum2) ==
-        HddsUtils.getNumberFromConfigKeys(
-            conf,
-            ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY,
-                serviceId, nodeId),
-            OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0));
-  }
-}
+}
\ No newline at end of file
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 6ebd7e11ad..cdd9e52667 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -57,8 +57,7 @@ public final class OMConfigKeys {
   public static final String OZONE_OM_BIND_HOST_DEFAULT =
       "0.0.0.0";
   public static final int OZONE_OM_PORT_DEFAULT = 9862;
-  public static final String OZONE_OM_GRPC_PORT_KEY =
-      "ozone.om.grpc.port";
+
   public static final String OZONE_OM_HTTP_ENABLED_KEY =
       "ozone.om.http.enabled";
   public static final String OZONE_OM_HTTP_BIND_HOST_KEY =
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
deleted file mode 100644
index 498f935974..0000000000
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.om.ha;
-
-import org.apache.hadoop.hdds.conf.ConfigurationException;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.HddsUtils;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.ha.ConfUtils;
-import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport;
-import org.apache.hadoop.security.UserGroupInformation;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.OptionalInt;
-
-import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-
-/**
- * The Grpc s3gateway om transport failover proxy provider implementation
- * extending the ozone client OM failover proxy provider.  This implmentation
- * allows the Grpc OMTransport reuse OM failover retry policies and
- * getRetryAction methods.  In case of OM failover, client can try
- * connecting to another OM node from the list of proxies.
- */
-public class GrpcOMFailoverProxyProvider<T> extends
-    OMFailoverProxyProvider<T> {
-
-  private Map<String, String> omAddresses;
-
-  public GrpcOMFailoverProxyProvider(ConfigurationSource configuration,
-                                     UserGroupInformation ugi,
-                                     String omServiceId,
-                                     Class<T> protocol) throws IOException {
-    super(configuration, ugi, omServiceId, protocol);
-  }
-
-  @Override
-  protected void loadOMClientConfigs(ConfigurationSource config, String 
omSvcId)
-      throws IOException {
-    // to be used for base class omProxies,
-    // ProxyInfo not applicable for gRPC, just need key set
-    Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>();
-    // to be used for base class omProxyInfos
-    // OMProxyInfo not applicable for gRPC, just need key set
-    Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>();
-    List<String> omNodeIDList = new ArrayList<>();
-    omAddresses = new HashMap<>();
-
-    Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId);
-
-    for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
-      String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
-          omSvcId, nodeId);
-
-      Optional<String> hostaddr = getHostNameFromConfigKeys(config,
-          rpcAddrKey);
-
-      OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config,
-          ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
-              omSvcId, nodeId),
-          OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
-      if (nodeId == null) {
-        nodeId = OzoneConsts.OM_DEFAULT_NODE_ID;
-      }
-      omProxiesNodeIdKeyset.put(nodeId, null);
-      omProxyInfosNodeIdKeyset.put(nodeId, null);
-      if (hostaddr.isPresent()) {
-        omAddresses.put(nodeId,
-            hostaddr.get() + ":"
-                + hostport.orElse(config
-                .getObject(GrpcOmTransport
-                    .GrpcOmTransportConfig.class)
-                .getPort()));
-      } else {
-        LOG.error("expected host address not defined for: {}", rpcAddrKey);
-        throw new ConfigurationException(rpcAddrKey + "is not defined");
-      }
-      omNodeIDList.add(nodeId);
-    }
-
-    if (omProxiesNodeIdKeyset.isEmpty()) {
-      throw new IllegalArgumentException("Could not find any configured " +
-          "addresses for OM. Please configure the system with "
-          + OZONE_OM_ADDRESS_KEY);
-    }
-
-    // set base class omProxies, omProxyInfos, omNodeIDList
-
-    // omProxies needed in base class
-    // omProxies.size == number of om nodes
-    // omProxies key needs to be valid nodeid
-    // omProxyInfos keyset needed in base class
-    setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList);
-  }
-
-  @Override
-  protected Text computeDelegationTokenService() {
-    return new Text();
-  }
-
-  // need to throw if nodeID not in omAddresses
-  public String getGrpcProxyAddress(String nodeId) throws IOException {
-    if (omAddresses.containsKey(nodeId)) {
-      return omAddresses.get(nodeId);
-    } else {
-      LOG.error("expected nodeId not found in omAddresses for proxyhost {}",
-          nodeId);
-      throw new IOException(
-          "expected nodeId not found in omAddresses for proxyhost");
-    }
-
-  }
-
-  public List<String> getGrpcOmNodeIDList() {
-    return getOmNodeIDList();
-  }
-}
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
index 9fb690e760..5432468452 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
@@ -148,6 +148,8 @@ public class OMFailoverProxyProvider<T> implements
             rpcAddrStr);
 
         if (omProxyInfo.getAddress() != null) {
+
+
           // For a non-HA OM setup, nodeId might be null. If so, we assign it
           // the default value
           if (nodeId == null) {
@@ -549,18 +551,14 @@ public class OMFailoverProxyProvider<T> implements
     return null;
   }
 
-  protected void setProxies(
-      Map<String, ProxyInfo<T>> setOMProxies,
-      Map<String, OMProxyInfo> setOMProxyInfos,
-      List<String> setOMNodeIDList) {
-    this.omProxies = setOMProxies;
-    this.omProxyInfos = setOMProxyInfos;
-    this.omNodeIDList = setOMNodeIDList;
-  }
-
-  protected List<String> getOmNodeIDList() {
-    return omNodeIDList;
+  @VisibleForTesting
+  protected void setProxiesForTesting(
+      Map<String, ProxyInfo<T>> testOMProxies,
+      Map<String, OMProxyInfo> testOMProxyInfos,
+      List<String> testOMNodeIDList) {
+    this.omProxies = testOMProxies;
+    this.omProxyInfos = testOMProxyInfos;
+    this.omNodeIDList = testOMNodeIDList;
   }
-
 }
 
diff --git 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
index 72c29f0cc6..3607429e52 100644
--- 
a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java
@@ -18,34 +18,22 @@
 package org.apache.hadoop.ozone.om.protocolPB;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
+import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
 
-import com.google.common.net.HostAndPort;
 import io.grpc.Status;
-import io.grpc.StatusRuntimeException;
-import org.apache.hadoop.ipc.RemoteException;
-
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import org.apache.hadoop.security.UserGroupInformation;
 
-import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
 import io.grpc.ManagedChannel;
 import io.grpc.netty.NettyChannelBuilder;
@@ -54,10 +42,12 @@ import com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH;
 import static org.apache.hadoop.ozone.om.OMConfigKeys
     .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT;
+import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys;
 
 /**
  * Grpc transport for grpc between s3g and om.
@@ -70,169 +60,58 @@ public class GrpcOmTransport implements OmTransport {
   private final AtomicBoolean isRunning = new AtomicBoolean(false);
 
   // gRPC specific
+  private ManagedChannel channel;
+
   private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client;
-  private Map<String,
-      OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients;
-  private Map<String, ManagedChannel> channels;
-  private int lastVisited = -1;
-  private ConfigurationSource conf;
 
-  //private String host = "om";
-  private AtomicReference<String> host;
+  private String host = "om";
+  private int port = 8981;
   private int maxSize;
 
-  private List<String> oms;
-  private RetryPolicy retryPolicy;
-  private int failoverCount = 0;
-  private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB>
-      omFailoverProxyProvider;
-
   public GrpcOmTransport(ConfigurationSource conf,
                           UserGroupInformation ugi, String omServiceId)
       throws IOException {
+    Optional<String> omHost = getHostNameFromConfigKeys(conf,
+        OZONE_OM_ADDRESS_KEY);
+    this.host = omHost.orElse("0.0.0.0");
 
-    this.channels = new HashMap<>();
-    this.clients = new HashMap<>();
-    this.conf = conf;
-    this.host = new AtomicReference();
+    port = conf.getObject(GrpcOmTransportConfig.class).getPort();
 
     maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH,
         OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT);
 
-    omFailoverProxyProvider = new GrpcOMFailoverProxyProvider(
-        conf,
-        ugi,
-        omServiceId,
-        OzoneManagerProtocolPB.class);
-
     start();
   }
 
-  public void start() throws IOException {
-    host.set(omFailoverProxyProvider
-        .getGrpcProxyAddress(
-            omFailoverProxyProvider.getCurrentProxyOMNodeId()));
-
+  public void start() {
     if (!isRunning.compareAndSet(false, true)) {
       LOG.info("Ignore. already started.");
       return;
     }
+    NettyChannelBuilder channelBuilder =
+        NettyChannelBuilder.forAddress(host, port)
+            .usePlaintext()
+            .maxInboundMessageSize(maxSize);
 
-    List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
-    for (String nodeId : nodes) {
-      String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
-      HostAndPort hp = HostAndPort.fromString(hostaddr);
-
-      NettyChannelBuilder channelBuilder =
-          NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort())
-              .usePlaintext()
-              .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE);
-      channels.put(hostaddr, channelBuilder.build());
-      clients.put(hostaddr,
-          OzoneManagerServiceGrpc
-              .newBlockingStub(channels.get(hostaddr)));
-    }
-    int maxFailovers = conf.getInt(
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+    channel = channelBuilder.build();
+    client = OzoneManagerServiceGrpc.newBlockingStub(channel);
 
-
-    retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers);
     LOG.info("{}: started", CLIENT_NAME);
   }
 
   @Override
   public OMResponse submitRequest(OMRequest payload) throws IOException {
     OMResponse resp = null;
-    boolean tryOtherHost = true;
-    ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
-    while (tryOtherHost) {
-      tryOtherHost = false;
-      try {
-        resp = clients.get(host.get()).submitRequest(payload);
-      } catch (StatusRuntimeException e) {
-        if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
-          resultCode = ResultCodes.TIMEOUT;
-        }
-        Exception exp = new Exception(e);
-        tryOtherHost = shouldRetry(unwrapException(exp));
-        if (!tryOtherHost) {
-          throw new OMException(resultCode);
-        }
-      }
-    }
-    return resp;
-  }
-
-  private Exception unwrapException(Exception ex) {
-    Exception grpcException = null;
     try {
-      StatusRuntimeException srexp =
-          (StatusRuntimeException)ex.getCause();
-      Status status = srexp.getStatus();
-      LOG.debug("GRPC exception wrapped: {}", status.getDescription());
-      if (status.getCode() == Status.Code.INTERNAL) {
-        // exception potentially generated by OzoneManagerServiceGrpc
-        Class<?> realClass = Class.forName(status.getDescription()
-            .substring(0, status.getDescription()
-                .indexOf(":")));
-        Class<? extends Exception> cls = realClass
-            .asSubclass(Exception.class);
-        Constructor<? extends Exception> cn = cls.getConstructor(String.class);
-        cn.setAccessible(true);
-        grpcException = cn.newInstance(status.getDescription());
-        IOException remote = null;
-        try {
-          String cause = status.getDescription();
-          cause = cause.substring(cause.indexOf(":") + 2);
-          remote = new RemoteException(cause.substring(0, cause.indexOf(":")),
-              cause.substring(cause.indexOf(":") + 1));
-          grpcException.initCause(remote);
-        } catch (Exception e) {
-          LOG.error("cannot get cause for remote exception");
-        }
-      } else {
-        // exception generated by connection failure, gRPC
-        grpcException = ex;
+      resp = client.submitRequest(payload);
+    } catch (io.grpc.StatusRuntimeException e) {
+      ResultCodes resultCode = ResultCodes.INTERNAL_ERROR;
+      if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
+        resultCode = ResultCodes.TIMEOUT;
       }
-    } catch (Exception e) {
-      grpcException = new IOException(e);
-      LOG.error("error unwrapping exception from OMResponse {}");
-    }
-    return grpcException;
-  }
-
-  private boolean shouldRetry(Exception ex) {
-    boolean retry = false;
-    RetryPolicy.RetryAction action = null;
-    try {
-      action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, 
true);
-      LOG.debug("grpc failover retry action {}", action.action);
-      if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) {
-        retry = false;
-        LOG.error("Retry request failed. " + action.reason, ex);
-      } else {
-        if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY ||
-            (action.action == RetryPolicy.RetryAction.RetryDecision
-                .FAILOVER_AND_RETRY)) {
-          if (action.delayMillis > 0) {
-            try {
-              Thread.sleep(action.delayMillis);
-            } catch (Exception e) {
-              LOG.error("Error trying sleep thread for {}", 
action.delayMillis);
-            }
-          }
-          // switch om host to current proxy OMNodeId
-          host.set(omFailoverProxyProvider
-              .getGrpcProxyAddress(
-                  omFailoverProxyProvider.getCurrentProxyOMNodeId()));
-          retry = true;
-        }
-      }
-    } catch (Exception e) {
-      LOG.error("Failed failover exception {}", e);
+      throw new OMException(e.getCause(), resultCode);
     }
-    return retry;
+    return resp;
   }
 
   // stub implementation for interface
@@ -242,15 +121,11 @@ public class GrpcOmTransport implements OmTransport {
   }
 
   public void shutdown() {
-    for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) {
-      ManagedChannel channel = entry.getValue();
-      channel.shutdown();
-      try {
-        channel.awaitTermination(5, TimeUnit.SECONDS);
-      } catch (Exception e) {
-        LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}",
-            entry.getKey(), e);
-      }
+    channel.shutdown();
+    try {
+      channel.awaitTermination(5, TimeUnit.SECONDS);
+    } catch (Exception e) {
+      LOG.error("failed to shutdown OzoneManagerServiceGrpc channel", e);
     }
   }
 
@@ -281,16 +156,9 @@ public class GrpcOmTransport implements OmTransport {
   }
 
   @VisibleForTesting
-  public void startClient(ManagedChannel testChannel) throws IOException {
-    List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList();
-    for (String nodeId : nodes) {
-      String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId);
+  public void startClient(ManagedChannel testChannel) {
+    client = OzoneManagerServiceGrpc.newBlockingStub(testChannel);
 
-      clients.put(hostaddr,
-          OzoneManagerServiceGrpc
-              .newBlockingStub(testChannel));
-    }
     LOG.info("{}: started", CLIENT_NAME);
   }
-
 }
diff --git 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
index b427db5562..323bb0eeb3 100644
--- 
a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
+++ 
b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java
@@ -25,29 +25,25 @@ import static org.mockito.Mockito.mock;
 import io.grpc.inprocess.InProcessChannelBuilder;
 import io.grpc.inprocess.InProcessServerBuilder;
 import io.grpc.testing.GrpcCleanupRule;
-import io.grpc.ManagedChannel;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc;
 import org.apache.hadoop.security.UserGroupInformation;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.Before;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.IOException;
 
-import com.google.protobuf.ServiceException;
-import org.apache.ratis.protocol.RaftPeerId;
+import io.grpc.ManagedChannel;
 
-import static org.junit.Assert.fail;
+import static 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK;
 
 /**
  * Tests for GrpcOmTransport client.
@@ -63,32 +59,11 @@ public class TestS3GrpcOmTransport {
 
   private final OMResponse omResponse = OMResponse.newBuilder()
                   .setSuccess(true)
-                  .setStatus(org.apache.hadoop.ozone.protocol
-                      .proto.OzoneManagerProtocolProtos.Status.OK)
+                  .setStatus(Status.OK)
                   .setLeaderOMNodeId(leaderOMNodeId)
                   .setCmdType(Type.AllocateBlock)
                   .build();
 
-  private boolean doFailover = false;
-
-  private OzoneConfiguration conf;
-
-  private String omServiceId;
-  private UserGroupInformation ugi;
-  private ManagedChannel channel;
-
-
-  private ServiceException createNotLeaderException() {
-    RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId");
-
-    // TODO: Set suggest leaderID. Right now, client is not using suggest
-    // leaderID. Need to fix this.
-    OMNotLeaderException notLeaderException =
-        new OMNotLeaderException(raftPeerId);
-    LOG.debug(notLeaderException.getMessage());
-    return new ServiceException(notLeaderException);
-  }
-
   private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase
       serviceImpl =
         mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class,
@@ -103,22 +78,10 @@ public class TestS3GrpcOmTransport {
                                               .OzoneManagerProtocolProtos
                                               .OMResponse>
                                           responseObserver) {
-                  try {
-                    if (doFailover) {
-                      doFailover = false;
-                      throw createNotLeaderException();
-                    } else {
-                      responseObserver.onNext(omResponse);
-                      responseObserver.onCompleted();
-                    }
-                  } catch (Throwable e) {
-                    IOException ex = new IOException(e.getCause());
-                    responseObserver.onError(io.grpc.Status
-                        .INTERNAL
-                        .withDescription(ex.getMessage())
-                        .asRuntimeException());
-                  }
+                  responseObserver.onNext(omResponse);
+                  responseObserver.onCompleted();
                 }
+
               }));
 
   private GrpcOmTransport client;
@@ -138,37 +101,18 @@ public class TestS3GrpcOmTransport {
         .start());
 
     // Create a client channel and register for automatic graceful shutdown.
-    channel = grpcCleanup.register(
+    ManagedChannel channel = grpcCleanup.register(
         InProcessChannelBuilder.forName(serverName).directExecutor().build());
 
-    omServiceId = "";
-    conf = new OzoneConfiguration();
-    ugi = UserGroupInformation.getCurrentUser();
-    doFailover = false;
-  }
-
-  @Test
-  public void testSubmitRequestToServer() throws Exception {
-    ServiceListRequest req = ServiceListRequest.newBuilder().build();
-
-    final OMRequest omRequest = OMRequest.newBuilder()
-        .setCmdType(Type.ServiceList)
-        .setVersion(CURRENT_VERSION)
-        .setClientId("test")
-        .setServiceListRequest(req)
-        .build();
-
+    String omServiceId = "";
+    OzoneConfiguration conf = new OzoneConfiguration();
+    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
     client = new GrpcOmTransport(conf, ugi, omServiceId);
     client.startClient(channel);
-
-    final OMResponse resp = client.submitRequest(omRequest);
-    Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
-        .proto.OzoneManagerProtocolProtos.Status.OK);
-    Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
   }
 
   @Test
-  public void testGrpcFailoverProxy() throws Exception {
+  public void testSubmitRequestToServer() throws Exception {
     ServiceListRequest req = ServiceListRequest.newBuilder().build();
 
     final OMRequest omRequest = OMRequest.newBuilder()
@@ -178,45 +122,8 @@ public class TestS3GrpcOmTransport {
         .setServiceListRequest(req)
         .build();
 
-    client = new GrpcOmTransport(conf, ugi, omServiceId);
-    client.startClient(channel);
-
-    doFailover = true;
-    // first invocation generates a NotALeaderException
-    // failover is performed and request is internally retried
-    // second invocation request to server succeeds
     final OMResponse resp = client.submitRequest(omRequest);
-    Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol
-        .proto.OzoneManagerProtocolProtos.Status.OK);
+    Assert.assertEquals(resp.getStatus(), OK);
     Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId);
   }
-
-  @Test
-  public void testGrpcFailoverProxyExhaustRetry() throws Exception {
-    ServiceListRequest req = ServiceListRequest.newBuilder().build();
-
-    final OMRequest omRequest = OMRequest.newBuilder()
-        .setCmdType(Type.ServiceList)
-        .setVersion(CURRENT_VERSION)
-        .setClientId("test")
-        .setServiceListRequest(req)
-        .build();
-
-    conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0);
-    client = new GrpcOmTransport(conf, ugi, omServiceId);
-    client.startClient(channel);
-
-    doFailover = true;
-    // first invocation generates a NotALeaderException
-    // failover is performed and request is internally retried
-    // OMFailoverProvider returns Fail retry due to #attempts >
-    // max failovers
-
-    try {
-      final OMResponse resp = client.submitRequest(omRequest);
-      fail();
-    } catch (Exception e) {
-      Assert.assertTrue(true);
-    }
-  }
 }
diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
index 4642680394..69f4e52eae 100644
--- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config
@@ -36,7 +36,6 @@ OZONE-SITE.XML_hdds.datanode.dir=/data/hdds
 OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
-OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 HDFS-SITE.XML_rpc.metrics.quantile.enable=true
 HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300
 ASYNC_PROFILER_HOME=/opt/profiler
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
index be93d0a6ec..498d02efae 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config
@@ -51,7 +51,6 @@ OZONE-SITE.XML_hdds.grpc.tls.enabled=true
 OZONE-SITE.XML_ozone.replication=3
 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s
 OZONE-SITE.XML_hdds.container.report.interval=60s
-OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true
 
 OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m
 OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh 
b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
index 252f953163..7410822cfa 100755
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh
@@ -35,7 +35,7 @@ execute_robot_test ${SCM} freon
 
 execute_robot_test ${SCM} basic/links.robot
 
-execute_robot_test ${SCM} s3
+#execute_robot_test ${SCM} s3
 
 execute_robot_test ${SCM} admincli
 
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 1c772cf46b..3269c394f7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@ -111,8 +111,7 @@ public class TestOzoneConfigurationFields extends 
TestConfigurationFieldsBase {
         ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM,
         OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY,
         OMConfigKeys.OZONE_OM_HA_PREFIX,
-        OMConfigKeys.OZONE_OM_TRANSPORT_CLASS,
-        OMConfigKeys.OZONE_OM_GRPC_PORT_KEY
+        OMConfigKeys.OZONE_OM_TRANSPORT_CLASS
         // TODO HDDS-2856
     ));
   }
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
index 7fe338c83e..60942f971b 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java
@@ -18,16 +18,13 @@
 package org.apache.hadoop.ozone.om;
 
 import java.io.IOException;
-import java.util.OptionalInt;
 import java.util.concurrent.TimeUnit;
 
-import org.apache.hadoop.hdds.HddsUtils;
 import org.apache.hadoop.hdds.conf.Config;
 import org.apache.hadoop.hdds.conf.ConfigGroup;
 import org.apache.hadoop.hdds.conf.ConfigTag;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.ha.ConfUtils;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager;
 import io.grpc.Server;
@@ -50,20 +47,9 @@ public class GrpcOzoneManagerServer {
                                     omTranslator,
                                 OzoneDelegationTokenSecretManager
                                     delegationTokenMgr) {
-    OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config,
-        ConfUtils.addKeySuffixes(
-            OMConfigKeys.OZONE_OM_GRPC_PORT_KEY,
-            config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY),
-            config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)),
-        OMConfigKeys.OZONE_OM_GRPC_PORT_KEY);
-    if (haPort.isPresent()) {
-      this.port = haPort.getAsInt();
-    } else {
-      this.port = config.getObject(
-              GrpcOzoneManagerServerConfig.class).
-          getPort();
-    }
-
+    this.port = config.getObject(
+        GrpcOzoneManagerServerConfig.class).
+        getPort();
     init(omTranslator,
         delegationTokenMgr,
         config);
diff --git 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
index a88e259a28..de11608703 100644
--- 
a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
+++ 
b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java
@@ -17,13 +17,13 @@
  */
 package org.apache.hadoop.ozone.om;
 
-import io.grpc.Status;
 import com.google.protobuf.RpcController;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ipc.ClientId;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils;
 import 
org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase;
 import 
org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.protocol.proto
@@ -68,6 +68,7 @@ public class OzoneManagerServiceGrpc extends 
OzoneManagerServiceImplBase {
         "processing s3g client submit request - for command {}",
         request.getCmdType().name());
     AtomicInteger callCount = new AtomicInteger(0);
+    OMResponse omResponse = null;
 
     org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1,
         callCount.incrementAndGet(),
@@ -83,16 +84,42 @@ public class OzoneManagerServiceGrpc extends 
OzoneManagerServiceImplBase {
     // for OMRequests.  Test through successful ratis-enabled OMRequest
     // handling without dependency on hadoop IPC based Server.
     try {
-      OMResponse omResponse = this.omTranslator.
+      omResponse = this.omTranslator.
           submitRequest(NULL_RPC_CONTROLLER, request);
-      responseObserver.onNext(omResponse);
     } catch (Throwable e) {
-      IOException ex = new IOException(e.getCause());
-      responseObserver.onError(Status
-          .INTERNAL
-          .withDescription(ex.getMessage())
-          .asRuntimeException());
+      IOException ioe = null;
+      Throwable se = e.getCause();
+      if (se == null) {
+        ioe = new IOException(e);
+      } else {
+        ioe = se instanceof IOException ?
+            (IOException) se : new IOException(e);
+      }
+      omResponse = createErrorResponse(
+          request,
+          ioe);
     }
+    responseObserver.onNext(omResponse);
     responseObserver.onCompleted();
   }
+
+  /**
+   * Create OMResponse from the specified OMRequest and exception.
+   *
+   * @param omRequest
+   * @param exception
+   * @return OMResponse
+   */
+  private OMResponse createErrorResponse(
+      OMRequest omRequest, IOException exception) {
+    OMResponse.Builder omResponse = OMResponse.newBuilder()
+        .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception))
+        .setCmdType(omRequest.getCmdType())
+        .setTraceID(omRequest.getTraceID())
+        .setSuccess(false);
+    if (exception.getMessage() != null) {
+      omResponse.setMessage(exception.getMessage());
+    }
+    return omResponse.build();
+  }
 }
diff --git 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
index fe7f6f49ea..01601668b6 100644
--- 
a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
+++ 
b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java
@@ -143,7 +143,7 @@ public class TestOMFailovers {
         omProxyInfos.put(nodeId, null);
         omNodeIDList.add(nodeId);
       }
-      setProxies(omProxies, omProxyInfos, omNodeIDList);
+      setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to