This is an automated email from the ASF dual-hosted git repository. adoroszlai pushed a commit to branch HDDS-4440-s3-performance in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 76e238118de06bfa6f4c997b92ae57baa79f5a1b Author: Neil Joshi <[email protected]> AuthorDate: Wed Mar 9 12:19:01 2022 -0700 HDDS-5544. Update GRPC OmTransport implementation for HA (#2901) --- .../java/org/apache/hadoop/hdds/HddsUtils.java | 20 +++ .../java/org/apache/hadoop/hdds/TestHddsUtils.java | 39 +++- .../org/apache/hadoop/ozone/om/OMConfigKeys.java | 3 +- .../ozone/om/ha/GrpcOMFailoverProxyProvider.java | 143 +++++++++++++++ .../ozone/om/ha/OMFailoverProxyProvider.java | 22 +-- .../ozone/om/protocolPB/GrpcOmTransport.java | 196 +++++++++++++++++---- .../ozone/om/protocolPB/TestS3GrpcOmTransport.java | 119 +++++++++++-- .../src/main/compose/ozone-om-ha/docker-config | 1 + .../src/main/compose/ozonesecure-ha/docker-config | 1 + .../dist/src/main/compose/ozonesecure-ha/test.sh | 2 +- .../hadoop/ozone/TestOzoneConfigurationFields.java | 3 +- .../hadoop/ozone/om/GrpcOzoneManagerServer.java | 20 ++- .../hadoop/ozone/om/OzoneManagerServiceGrpc.java | 43 +---- .../hadoop/ozone/om/failover/TestOMFailovers.java | 2 +- 14 files changed, 516 insertions(+), 98 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java index ffbb3e3340..364377d396 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsUtils.java @@ -227,6 +227,26 @@ public final class HddsUtils { } } + /** + * Retrieve a number, trying the supplied config keys in order. + * Each config value may be absent + * + * @param conf Conf + * @param keys a list of configuration key names. + * + * @return first number found from the given keys, or absent. + */ + public static OptionalInt getNumberFromConfigKeys( + ConfigurationSource conf, String... keys) { + for (final String key : keys) { + final String value = conf.getTrimmed(key); + if (value != null) { + return OptionalInt.of(Integer.parseInt(value)); + } + } + return OptionalInt.empty(); + } + /** * Retrieve the port number, trying the supplied config keys in order. * Each config value may be absent, or if present in the format diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java index fd8aa28e63..67001010d5 100644 --- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/TestHddsUtils.java @@ -36,6 +36,8 @@ import static org.apache.hadoop.hdds.HddsUtils.getSCMAddressForDatanodes; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_ADDRESS_KEY; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_DEFAULT; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_PORT_KEY; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT; + import static org.hamcrest.core.Is.is; import org.junit.Assert; import static org.junit.Assert.assertThat; @@ -216,4 +218,39 @@ public class TestHddsUtils { } -} \ No newline at end of file + @Test + public void testGetNumberFromConfigKeys() { + final String testnum1 = "8"; + final String testnum2 = "7"; + final String serviceId = "id1"; + final String nodeId = "scm1"; + + OzoneConfiguration conf = new OzoneConfiguration(); + conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, + testnum1); + Assert.assertTrue(Integer.parseInt(testnum1) == + HddsUtils.getNumberFromConfigKeys( + conf, + OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0)); + + /* Test to return first unempty key number from list */ + /* first key is absent */ + Assert.assertTrue(Integer.parseInt(testnum1) == + HddsUtils.getNumberFromConfigKeys( + conf, + ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY, + serviceId, nodeId), + OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0)); + + /* now set the empty key and ensure returned value from this key */ + conf.set(ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY, + serviceId, nodeId), + testnum2); + Assert.assertTrue(Integer.parseInt(testnum2) == + HddsUtils.getNumberFromConfigKeys( + conf, + ConfUtils.addKeySuffixes(OZONE_SCM_DATANODE_PORT_KEY, + serviceId, nodeId), + OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT).orElse(0)); + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java index cdd9e52667..6ebd7e11ad 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java @@ -57,7 +57,8 @@ public final class OMConfigKeys { public static final String OZONE_OM_BIND_HOST_DEFAULT = "0.0.0.0"; public static final int OZONE_OM_PORT_DEFAULT = 9862; - + public static final String OZONE_OM_GRPC_PORT_KEY = + "ozone.om.grpc.port"; public static final String OZONE_OM_HTTP_ENABLED_KEY = "ozone.om.http.enabled"; public static final String OZONE_OM_HTTP_BIND_HOST_KEY = diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java new file mode 100644 index 0000000000..498f935974 --- /dev/null +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/GrpcOMFailoverProxyProvider.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ozone.om.ha; + +import org.apache.hadoop.hdds.conf.ConfigurationException; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.HddsUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ozone.OmUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.ha.ConfUtils; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.protocolPB.GrpcOmTransport; +import org.apache.hadoop.security.UserGroupInformation; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.OptionalInt; + +import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys; +import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; + +/** + * The Grpc s3gateway om transport failover proxy provider implementation + * extending the ozone client OM failover proxy provider. This implmentation + * allows the Grpc OMTransport reuse OM failover retry policies and + * getRetryAction methods. In case of OM failover, client can try + * connecting to another OM node from the list of proxies. + */ +public class GrpcOMFailoverProxyProvider<T> extends + OMFailoverProxyProvider<T> { + + private Map<String, String> omAddresses; + + public GrpcOMFailoverProxyProvider(ConfigurationSource configuration, + UserGroupInformation ugi, + String omServiceId, + Class<T> protocol) throws IOException { + super(configuration, ugi, omServiceId, protocol); + } + + @Override + protected void loadOMClientConfigs(ConfigurationSource config, String omSvcId) + throws IOException { + // to be used for base class omProxies, + // ProxyInfo not applicable for gRPC, just need key set + Map<String, ProxyInfo<T>> omProxiesNodeIdKeyset = new HashMap<>(); + // to be used for base class omProxyInfos + // OMProxyInfo not applicable for gRPC, just need key set + Map<String, OMProxyInfo> omProxyInfosNodeIdKeyset = new HashMap<>(); + List<String> omNodeIDList = new ArrayList<>(); + omAddresses = new HashMap<>(); + + Collection<String> omNodeIds = OmUtils.getActiveOMNodeIds(config, omSvcId); + + for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) { + + String rpcAddrKey = ConfUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY, + omSvcId, nodeId); + + Optional<String> hostaddr = getHostNameFromConfigKeys(config, + rpcAddrKey); + + OptionalInt hostport = HddsUtils.getNumberFromConfigKeys(config, + ConfUtils.addKeySuffixes(OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, + omSvcId, nodeId), + OMConfigKeys.OZONE_OM_GRPC_PORT_KEY); + if (nodeId == null) { + nodeId = OzoneConsts.OM_DEFAULT_NODE_ID; + } + omProxiesNodeIdKeyset.put(nodeId, null); + omProxyInfosNodeIdKeyset.put(nodeId, null); + if (hostaddr.isPresent()) { + omAddresses.put(nodeId, + hostaddr.get() + ":" + + hostport.orElse(config + .getObject(GrpcOmTransport + .GrpcOmTransportConfig.class) + .getPort())); + } else { + LOG.error("expected host address not defined for: {}", rpcAddrKey); + throw new ConfigurationException(rpcAddrKey + "is not defined"); + } + omNodeIDList.add(nodeId); + } + + if (omProxiesNodeIdKeyset.isEmpty()) { + throw new IllegalArgumentException("Could not find any configured " + + "addresses for OM. Please configure the system with " + + OZONE_OM_ADDRESS_KEY); + } + + // set base class omProxies, omProxyInfos, omNodeIDList + + // omProxies needed in base class + // omProxies.size == number of om nodes + // omProxies key needs to be valid nodeid + // omProxyInfos keyset needed in base class + setProxies(omProxiesNodeIdKeyset, omProxyInfosNodeIdKeyset, omNodeIDList); + } + + @Override + protected Text computeDelegationTokenService() { + return new Text(); + } + + // need to throw if nodeID not in omAddresses + public String getGrpcProxyAddress(String nodeId) throws IOException { + if (omAddresses.containsKey(nodeId)) { + return omAddresses.get(nodeId); + } else { + LOG.error("expected nodeId not found in omAddresses for proxyhost {}", + nodeId); + throw new IOException( + "expected nodeId not found in omAddresses for proxyhost"); + } + + } + + public List<String> getGrpcOmNodeIDList() { + return getOmNodeIDList(); + } +} diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java index 5432468452..9fb690e760 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java @@ -148,8 +148,6 @@ public class OMFailoverProxyProvider<T> implements rpcAddrStr); if (omProxyInfo.getAddress() != null) { - - // For a non-HA OM setup, nodeId might be null. If so, we assign it // the default value if (nodeId == null) { @@ -551,14 +549,18 @@ public class OMFailoverProxyProvider<T> implements return null; } - @VisibleForTesting - protected void setProxiesForTesting( - Map<String, ProxyInfo<T>> testOMProxies, - Map<String, OMProxyInfo> testOMProxyInfos, - List<String> testOMNodeIDList) { - this.omProxies = testOMProxies; - this.omProxyInfos = testOMProxyInfos; - this.omNodeIDList = testOMNodeIDList; + protected void setProxies( + Map<String, ProxyInfo<T>> setOMProxies, + Map<String, OMProxyInfo> setOMProxyInfos, + List<String> setOMNodeIDList) { + this.omProxies = setOMProxies; + this.omProxyInfos = setOMProxyInfos; + this.omNodeIDList = setOMNodeIDList; } + + protected List<String> getOmNodeIDList() { + return omNodeIDList; + } + } diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java index 3607429e52..72c29f0cc6 100644 --- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java +++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/GrpcOmTransport.java @@ -18,22 +18,34 @@ package org.apache.hadoop.ozone.om.protocolPB; import java.io.IOException; -import java.util.Optional; +import java.lang.reflect.Constructor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import com.google.common.net.HostAndPort; import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import org.apache.hadoop.ipc.RemoteException; + import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.om.exceptions.OMException; import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.ozone.om.ha.GrpcOMFailoverProxyProvider; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; import io.grpc.ManagedChannel; import io.grpc.netty.NettyChannelBuilder; @@ -42,12 +54,10 @@ import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY; import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH; import static org.apache.hadoop.ozone.om.OMConfigKeys .OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT; -import static org.apache.hadoop.hdds.HddsUtils.getHostNameFromConfigKeys; /** * Grpc transport for grpc between s3g and om. @@ -60,60 +70,171 @@ public class GrpcOmTransport implements OmTransport { private final AtomicBoolean isRunning = new AtomicBoolean(false); // gRPC specific - private ManagedChannel channel; - private OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub client; + private Map<String, + OzoneManagerServiceGrpc.OzoneManagerServiceBlockingStub> clients; + private Map<String, ManagedChannel> channels; + private int lastVisited = -1; + private ConfigurationSource conf; - private String host = "om"; - private int port = 8981; + //private String host = "om"; + private AtomicReference<String> host; private int maxSize; + private List<String> oms; + private RetryPolicy retryPolicy; + private int failoverCount = 0; + private GrpcOMFailoverProxyProvider<OzoneManagerProtocolPB> + omFailoverProxyProvider; + public GrpcOmTransport(ConfigurationSource conf, UserGroupInformation ugi, String omServiceId) throws IOException { - Optional<String> omHost = getHostNameFromConfigKeys(conf, - OZONE_OM_ADDRESS_KEY); - this.host = omHost.orElse("0.0.0.0"); - port = conf.getObject(GrpcOmTransportConfig.class).getPort(); + this.channels = new HashMap<>(); + this.clients = new HashMap<>(); + this.conf = conf; + this.host = new AtomicReference(); maxSize = conf.getInt(OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH, OZONE_OM_GRPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT); + omFailoverProxyProvider = new GrpcOMFailoverProxyProvider( + conf, + ugi, + omServiceId, + OzoneManagerProtocolPB.class); + start(); } - public void start() { + public void start() throws IOException { + host.set(omFailoverProxyProvider + .getGrpcProxyAddress( + omFailoverProxyProvider.getCurrentProxyOMNodeId())); + if (!isRunning.compareAndSet(false, true)) { LOG.info("Ignore. already started."); return; } - NettyChannelBuilder channelBuilder = - NettyChannelBuilder.forAddress(host, port) - .usePlaintext() - .maxInboundMessageSize(maxSize); - channel = channelBuilder.build(); - client = OzoneManagerServiceGrpc.newBlockingStub(channel); + List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList(); + for (String nodeId : nodes) { + String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId); + HostAndPort hp = HostAndPort.fromString(hostaddr); + + NettyChannelBuilder channelBuilder = + NettyChannelBuilder.forAddress(hp.getHost(), hp.getPort()) + .usePlaintext() + .maxInboundMessageSize(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE); + channels.put(hostaddr, channelBuilder.build()); + clients.put(hostaddr, + OzoneManagerServiceGrpc + .newBlockingStub(channels.get(hostaddr))); + } + int maxFailovers = conf.getInt( + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, + OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT); + + retryPolicy = omFailoverProxyProvider.getRetryPolicy(maxFailovers); LOG.info("{}: started", CLIENT_NAME); } @Override public OMResponse submitRequest(OMRequest payload) throws IOException { OMResponse resp = null; - try { - resp = client.submitRequest(payload); - } catch (io.grpc.StatusRuntimeException e) { - ResultCodes resultCode = ResultCodes.INTERNAL_ERROR; - if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { - resultCode = ResultCodes.TIMEOUT; + boolean tryOtherHost = true; + ResultCodes resultCode = ResultCodes.INTERNAL_ERROR; + while (tryOtherHost) { + tryOtherHost = false; + try { + resp = clients.get(host.get()).submitRequest(payload); + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) { + resultCode = ResultCodes.TIMEOUT; + } + Exception exp = new Exception(e); + tryOtherHost = shouldRetry(unwrapException(exp)); + if (!tryOtherHost) { + throw new OMException(resultCode); + } } - throw new OMException(e.getCause(), resultCode); } return resp; } + private Exception unwrapException(Exception ex) { + Exception grpcException = null; + try { + StatusRuntimeException srexp = + (StatusRuntimeException)ex.getCause(); + Status status = srexp.getStatus(); + LOG.debug("GRPC exception wrapped: {}", status.getDescription()); + if (status.getCode() == Status.Code.INTERNAL) { + // exception potentially generated by OzoneManagerServiceGrpc + Class<?> realClass = Class.forName(status.getDescription() + .substring(0, status.getDescription() + .indexOf(":"))); + Class<? extends Exception> cls = realClass + .asSubclass(Exception.class); + Constructor<? extends Exception> cn = cls.getConstructor(String.class); + cn.setAccessible(true); + grpcException = cn.newInstance(status.getDescription()); + IOException remote = null; + try { + String cause = status.getDescription(); + cause = cause.substring(cause.indexOf(":") + 2); + remote = new RemoteException(cause.substring(0, cause.indexOf(":")), + cause.substring(cause.indexOf(":") + 1)); + grpcException.initCause(remote); + } catch (Exception e) { + LOG.error("cannot get cause for remote exception"); + } + } else { + // exception generated by connection failure, gRPC + grpcException = ex; + } + } catch (Exception e) { + grpcException = new IOException(e); + LOG.error("error unwrapping exception from OMResponse {}"); + } + return grpcException; + } + + private boolean shouldRetry(Exception ex) { + boolean retry = false; + RetryPolicy.RetryAction action = null; + try { + action = retryPolicy.shouldRetry((Exception)ex, 0, failoverCount++, true); + LOG.debug("grpc failover retry action {}", action.action); + if (action.action == RetryPolicy.RetryAction.RetryDecision.FAIL) { + retry = false; + LOG.error("Retry request failed. " + action.reason, ex); + } else { + if (action.action == RetryPolicy.RetryAction.RetryDecision.RETRY || + (action.action == RetryPolicy.RetryAction.RetryDecision + .FAILOVER_AND_RETRY)) { + if (action.delayMillis > 0) { + try { + Thread.sleep(action.delayMillis); + } catch (Exception e) { + LOG.error("Error trying sleep thread for {}", action.delayMillis); + } + } + // switch om host to current proxy OMNodeId + host.set(omFailoverProxyProvider + .getGrpcProxyAddress( + omFailoverProxyProvider.getCurrentProxyOMNodeId())); + retry = true; + } + } + } catch (Exception e) { + LOG.error("Failed failover exception {}", e); + } + return retry; + } + // stub implementation for interface @Override public Text getDelegationTokenService() { @@ -121,11 +242,15 @@ public class GrpcOmTransport implements OmTransport { } public void shutdown() { - channel.shutdown(); - try { - channel.awaitTermination(5, TimeUnit.SECONDS); - } catch (Exception e) { - LOG.error("failed to shutdown OzoneManagerServiceGrpc channel", e); + for (Map.Entry<String, ManagedChannel> entry : channels.entrySet()) { + ManagedChannel channel = entry.getValue(); + channel.shutdown(); + try { + channel.awaitTermination(5, TimeUnit.SECONDS); + } catch (Exception e) { + LOG.error("failed to shutdown OzoneManagerServiceGrpc channel {} : {}", + entry.getKey(), e); + } } } @@ -156,9 +281,16 @@ public class GrpcOmTransport implements OmTransport { } @VisibleForTesting - public void startClient(ManagedChannel testChannel) { - client = OzoneManagerServiceGrpc.newBlockingStub(testChannel); + public void startClient(ManagedChannel testChannel) throws IOException { + List<String> nodes = omFailoverProxyProvider.getGrpcOmNodeIDList(); + for (String nodeId : nodes) { + String hostaddr = omFailoverProxyProvider.getGrpcProxyAddress(nodeId); + clients.put(hostaddr, + OzoneManagerServiceGrpc + .newBlockingStub(testChannel)); + } LOG.info("{}: started", CLIENT_NAME); } + } diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java index 323bb0eeb3..b427db5562 100644 --- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java +++ b/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/om/protocolPB/TestS3GrpcOmTransport.java @@ -25,25 +25,29 @@ import static org.mockito.Mockito.mock; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.testing.GrpcCleanupRule; +import io.grpc.ManagedChannel; import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.ozone.OzoneConfigKeys; +import org.apache.hadoop.ozone.om.exceptions.OMNotLeaderException; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMResponse; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServiceListRequest; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type; -import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; -import io.grpc.ManagedChannel; +import com.google.protobuf.ServiceException; +import org.apache.ratis.protocol.RaftPeerId; -import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.OK; +import static org.junit.Assert.fail; /** * Tests for GrpcOmTransport client. @@ -59,11 +63,32 @@ public class TestS3GrpcOmTransport { private final OMResponse omResponse = OMResponse.newBuilder() .setSuccess(true) - .setStatus(Status.OK) + .setStatus(org.apache.hadoop.ozone.protocol + .proto.OzoneManagerProtocolProtos.Status.OK) .setLeaderOMNodeId(leaderOMNodeId) .setCmdType(Type.AllocateBlock) .build(); + private boolean doFailover = false; + + private OzoneConfiguration conf; + + private String omServiceId; + private UserGroupInformation ugi; + private ManagedChannel channel; + + + private ServiceException createNotLeaderException() { + RaftPeerId raftPeerId = RaftPeerId.getRaftPeerId("testNodeId"); + + // TODO: Set suggest leaderID. Right now, client is not using suggest + // leaderID. Need to fix this. + OMNotLeaderException notLeaderException = + new OMNotLeaderException(raftPeerId); + LOG.debug(notLeaderException.getMessage()); + return new ServiceException(notLeaderException); + } + private final OzoneManagerServiceGrpc.OzoneManagerServiceImplBase serviceImpl = mock(OzoneManagerServiceGrpc.OzoneManagerServiceImplBase.class, @@ -78,10 +103,22 @@ public class TestS3GrpcOmTransport { .OzoneManagerProtocolProtos .OMResponse> responseObserver) { - responseObserver.onNext(omResponse); - responseObserver.onCompleted(); + try { + if (doFailover) { + doFailover = false; + throw createNotLeaderException(); + } else { + responseObserver.onNext(omResponse); + responseObserver.onCompleted(); + } + } catch (Throwable e) { + IOException ex = new IOException(e.getCause()); + responseObserver.onError(io.grpc.Status + .INTERNAL + .withDescription(ex.getMessage()) + .asRuntimeException()); + } } - })); private GrpcOmTransport client; @@ -101,18 +138,37 @@ public class TestS3GrpcOmTransport { .start()); // Create a client channel and register for automatic graceful shutdown. - ManagedChannel channel = grpcCleanup.register( + channel = grpcCleanup.register( InProcessChannelBuilder.forName(serverName).directExecutor().build()); - String omServiceId = ""; - OzoneConfiguration conf = new OzoneConfiguration(); - UserGroupInformation ugi = UserGroupInformation.getCurrentUser(); + omServiceId = ""; + conf = new OzoneConfiguration(); + ugi = UserGroupInformation.getCurrentUser(); + doFailover = false; + } + + @Test + public void testSubmitRequestToServer() throws Exception { + ServiceListRequest req = ServiceListRequest.newBuilder().build(); + + final OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.ServiceList) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .setServiceListRequest(req) + .build(); + client = new GrpcOmTransport(conf, ugi, omServiceId); client.startClient(channel); + + final OMResponse resp = client.submitRequest(omRequest); + Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol + .proto.OzoneManagerProtocolProtos.Status.OK); + Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId); } @Test - public void testSubmitRequestToServer() throws Exception { + public void testGrpcFailoverProxy() throws Exception { ServiceListRequest req = ServiceListRequest.newBuilder().build(); final OMRequest omRequest = OMRequest.newBuilder() @@ -122,8 +178,45 @@ public class TestS3GrpcOmTransport { .setServiceListRequest(req) .build(); + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient(channel); + + doFailover = true; + // first invocation generates a NotALeaderException + // failover is performed and request is internally retried + // second invocation request to server succeeds final OMResponse resp = client.submitRequest(omRequest); - Assert.assertEquals(resp.getStatus(), OK); + Assert.assertEquals(resp.getStatus(), org.apache.hadoop.ozone.protocol + .proto.OzoneManagerProtocolProtos.Status.OK); Assert.assertEquals(resp.getLeaderOMNodeId(), leaderOMNodeId); } + + @Test + public void testGrpcFailoverProxyExhaustRetry() throws Exception { + ServiceListRequest req = ServiceListRequest.newBuilder().build(); + + final OMRequest omRequest = OMRequest.newBuilder() + .setCmdType(Type.ServiceList) + .setVersion(CURRENT_VERSION) + .setClientId("test") + .setServiceListRequest(req) + .build(); + + conf.setInt(OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 0); + client = new GrpcOmTransport(conf, ugi, omServiceId); + client.startClient(channel); + + doFailover = true; + // first invocation generates a NotALeaderException + // failover is performed and request is internally retried + // OMFailoverProvider returns Fail retry due to #attempts > + // max failovers + + try { + final OMResponse resp = client.submitRequest(omRequest); + fail(); + } catch (Exception e) { + Assert.assertTrue(true); + } + } } diff --git a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config index 69f4e52eae..4642680394 100644 --- a/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozone-om-ha/docker-config @@ -36,6 +36,7 @@ OZONE-SITE.XML_hdds.datanode.dir=/data/hdds OZONE-SITE.XML_hdds.profiler.endpoint.enabled=true OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s OZONE-SITE.XML_hdds.container.report.interval=60s +OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true HDFS-SITE.XML_rpc.metrics.quantile.enable=true HDFS-SITE.XML_rpc.metrics.percentiles.intervals=60,300 ASYNC_PROFILER_HOME=/opt/profiler diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config index 498d02efae..be93d0a6ec 100644 --- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config +++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/docker-config @@ -51,6 +51,7 @@ OZONE-SITE.XML_hdds.grpc.tls.enabled=true OZONE-SITE.XML_ozone.replication=3 OZONE-SITE.XML_hdds.scmclient.max.retry.timeout=30s OZONE-SITE.XML_hdds.container.report.interval=60s +OZONE-SITE.XML_ozone.om.s3.grpc.server_enabled=true OZONE-SITE.XML_ozone.recon.om.snapshot.task.interval.delay=1m OZONE-SITE.XML_ozone.recon.db.dir=/data/metadata/recon diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh index 7410822cfa..252f953163 100755 --- a/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh +++ b/hadoop-ozone/dist/src/main/compose/ozonesecure-ha/test.sh @@ -35,7 +35,7 @@ execute_robot_test ${SCM} freon execute_robot_test ${SCM} basic/links.robot -#execute_robot_test ${SCM} s3 +execute_robot_test ${SCM} s3 execute_robot_test ${SCM} admincli diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 3269c394f7..1c772cf46b 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -111,7 +111,8 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { ReconServerConfigKeys.RECON_OM_SNAPSHOT_TASK_FLUSH_PARAM, OMConfigKeys.OZONE_OM_RATIS_SNAPSHOT_AUTO_TRIGGER_THRESHOLD_KEY, OMConfigKeys.OZONE_OM_HA_PREFIX, - OMConfigKeys.OZONE_OM_TRANSPORT_CLASS + OMConfigKeys.OZONE_OM_TRANSPORT_CLASS, + OMConfigKeys.OZONE_OM_GRPC_PORT_KEY // TODO HDDS-2856 )); } diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java index 60942f971b..7fe338c83e 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/GrpcOzoneManagerServer.java @@ -18,13 +18,16 @@ package org.apache.hadoop.ozone.om; import java.io.IOException; +import java.util.OptionalInt; import java.util.concurrent.TimeUnit; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.Config; import org.apache.hadoop.hdds.conf.ConfigGroup; import org.apache.hadoop.hdds.conf.ConfigTag; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.ha.ConfUtils; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.security.OzoneDelegationTokenSecretManager; import io.grpc.Server; @@ -47,9 +50,20 @@ public class GrpcOzoneManagerServer { omTranslator, OzoneDelegationTokenSecretManager delegationTokenMgr) { - this.port = config.getObject( - GrpcOzoneManagerServerConfig.class). - getPort(); + OptionalInt haPort = HddsUtils.getNumberFromConfigKeys(config, + ConfUtils.addKeySuffixes( + OMConfigKeys.OZONE_OM_GRPC_PORT_KEY, + config.get(OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY), + config.get(OMConfigKeys.OZONE_OM_NODE_ID_KEY)), + OMConfigKeys.OZONE_OM_GRPC_PORT_KEY); + if (haPort.isPresent()) { + this.port = haPort.getAsInt(); + } else { + this.port = config.getObject( + GrpcOzoneManagerServerConfig.class). + getPort(); + } + init(omTranslator, delegationTokenMgr, config); diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java index de11608703..a88e259a28 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManagerServiceGrpc.java @@ -17,13 +17,13 @@ */ package org.apache.hadoop.ozone.om; +import io.grpc.Status; import com.google.protobuf.RpcController; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.ipc.ClientId; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ozone.om.ratis.utils.OzoneManagerRatisUtils; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerServiceGrpc.OzoneManagerServiceImplBase; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.protocol.proto @@ -68,7 +68,6 @@ public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase { "processing s3g client submit request - for command {}", request.getCmdType().name()); AtomicInteger callCount = new AtomicInteger(0); - OMResponse omResponse = null; org.apache.hadoop.ipc.Server.getCurCall().set(new Server.Call(1, callCount.incrementAndGet(), @@ -84,42 +83,16 @@ public class OzoneManagerServiceGrpc extends OzoneManagerServiceImplBase { // for OMRequests. Test through successful ratis-enabled OMRequest // handling without dependency on hadoop IPC based Server. try { - omResponse = this.omTranslator. + OMResponse omResponse = this.omTranslator. submitRequest(NULL_RPC_CONTROLLER, request); + responseObserver.onNext(omResponse); } catch (Throwable e) { - IOException ioe = null; - Throwable se = e.getCause(); - if (se == null) { - ioe = new IOException(e); - } else { - ioe = se instanceof IOException ? - (IOException) se : new IOException(e); - } - omResponse = createErrorResponse( - request, - ioe); + IOException ex = new IOException(e.getCause()); + responseObserver.onError(Status + .INTERNAL + .withDescription(ex.getMessage()) + .asRuntimeException()); } - responseObserver.onNext(omResponse); responseObserver.onCompleted(); } - - /** - * Create OMResponse from the specified OMRequest and exception. - * - * @param omRequest - * @param exception - * @return OMResponse - */ - private OMResponse createErrorResponse( - OMRequest omRequest, IOException exception) { - OMResponse.Builder omResponse = OMResponse.newBuilder() - .setStatus(OzoneManagerRatisUtils.exceptionToResponseStatus(exception)) - .setCmdType(omRequest.getCmdType()) - .setTraceID(omRequest.getTraceID()) - .setSuccess(false); - if (exception.getMessage() != null) { - omResponse.setMessage(exception.getMessage()); - } - return omResponse.build(); - } } diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java index 01601668b6..fe7f6f49ea 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/failover/TestOMFailovers.java @@ -143,7 +143,7 @@ public class TestOMFailovers { omProxyInfos.put(nodeId, null); omNodeIDList.add(nodeId); } - setProxiesForTesting(omProxies, omProxyInfos, omNodeIDList); + setProxies(omProxies, omProxyInfos, omNodeIDList); } @Override --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
