This is an automated email from the ASF dual-hosted git repository. xyao pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push: new 138d33e HDDS-3188 Add failover proxy for SCM block location. (#1340) 138d33e is described below commit 138d33ec0c6a0385f6a59f42fadaaec079ddb6c0 Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Thu Oct 1 00:27:09 2020 +0800 HDDS-3188 Add failover proxy for SCM block location. (#1340) --- .../hadoop/hdds/scm/exceptions/SCMException.java | 3 +- ...lockLocationProtocolClientSideTranslatorPB.java | 25 +- .../SCMBlockLocationFailoverProxyProvider.java | 280 +++++++++++++++++++++ .../hadoop/hdds/scm/proxy/SCMClientConfig.java | 103 ++++++++ .../apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java | 73 ++++++ .../apache/hadoop/hdds/scm/proxy/package-info.java | 22 ++ .../src/main/proto/ScmServerProtocol.proto | 3 + ...lockLocationProtocolServerSideTranslatorPB.java | 18 ++ .../hdds/scm/server/SCMBlockProtocolServer.java | 4 + .../hdds/scm/server/StorageContainerManager.java | 19 ++ .../org/apache/hadoop/ozone/om/OzoneManager.java | 11 +- 11 files changed, 545 insertions(+), 16 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index db1f82a..11b7b3c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -122,6 +122,7 @@ public class SCMException extends IOException { FAILED_TO_FIND_ACTIVE_PIPELINE, FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY, FAILED_TO_ALLOCATE_ENOUGH_BLOCKS, - INTERNAL_ERROR + INTERNAL_ERROR, + SCM_NOT_LEADER } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index e86ee81..12c51f6 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.annotation.InterfaceAudience; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Type; @@ -45,10 +46,11 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; -import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; @@ -73,15 +75,21 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB private static final RpcController NULL_RPC_CONTROLLER = null; private final ScmBlockLocationProtocolPB rpcProxy; + private SCMBlockLocationFailoverProxyProvider failoverProxyProvider; /** * Creates a new StorageContainerLocationProtocolClientSideTranslatorPB. * - * @param rpcProxy {@link StorageContainerLocationProtocolPB} RPC proxy + * @param proxyProvider {@link SCMBlockLocationFailoverProxyProvider} + * failover proxy provider. */ public ScmBlockLocationProtocolClientSideTranslatorPB( - ScmBlockLocationProtocolPB rpcProxy) { - this.rpcProxy = rpcProxy; + SCMBlockLocationFailoverProxyProvider proxyProvider) { + Preconditions.checkState(proxyProvider != null); + this.failoverProxyProvider = proxyProvider; + this.rpcProxy = (ScmBlockLocationProtocolPB) RetryProxy.create( + ScmBlockLocationProtocolPB.class, failoverProxyProvider, + failoverProxyProvider.getSCMBlockLocationRetryPolicy(null)); } /** @@ -105,6 +113,11 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB try { SCMBlockLocationResponse response = rpcProxy.send(NULL_RPC_CONTROLLER, req); + if (response.getStatus() == + ScmBlockLocationProtocolProtos.Status.SCM_NOT_LEADER) { + failoverProxyProvider + .performFailoverToAssignedLeader(response.getLeaderSCMNodeId()); + } return response; } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); @@ -267,7 +280,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB } @Override - public void close() { - RPC.stopProxy(rpcProxy); + public void close() throws IOException { + failoverProxyProvider.close(); } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java new file mode 100644 index 0000000..1beb69e --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMBlockLocationFailoverProxyProvider.java @@ -0,0 +1,280 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; +import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; +import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; +import org.apache.hadoop.io.retry.FailoverProxyProvider; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.ipc.ProtobufRpcEngine; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.UserGroupInformation; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY; +import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients; +import static org.apache.hadoop.hdds.HddsUtils.getPortNumberFromConfigKeys; +import static org.apache.hadoop.hdds.HddsUtils.getHostName; + +/** + * Failover proxy provider for SCM. + */ +public class SCMBlockLocationFailoverProxyProvider implements + FailoverProxyProvider<ScmBlockLocationProtocolPB>, Closeable { + public static final Logger LOG = + LoggerFactory.getLogger(SCMBlockLocationFailoverProxyProvider.class); + + private Map<String, ProxyInfo<ScmBlockLocationProtocolPB>> scmProxies; + private Map<String, SCMProxyInfo> scmProxyInfoMap; + private List<String> scmNodeIDList; + + private String currentProxySCMNodeId; + private int currentProxyIndex; + + private final ConfigurationSource conf; + private final long scmVersion; + + private final String scmServiceId; + + private String lastAttemptedLeader; + + private final int maxRetryCount; + private final long retryInterval; + + public static final String SCM_DUMMY_NODEID_PREFIX = "scm"; + + public SCMBlockLocationFailoverProxyProvider(ConfigurationSource conf) { + this.conf = conf; + this.scmVersion = RPC.getProtocolVersion(ScmBlockLocationProtocol.class); + this.scmServiceId = conf.getTrimmed(OZONE_SCM_SERVICE_IDS_KEY); + this.scmProxies = new HashMap<>(); + this.scmProxyInfoMap = new HashMap<>(); + this.scmNodeIDList = new ArrayList<>(); + loadConfigs(); + + + this.currentProxyIndex = 0; + currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex); + + SCMClientConfig config = conf.getObject(SCMClientConfig.class); + this.maxRetryCount = config.getRetryCount(); + this.retryInterval = config.getRetryInterval(); + } + + @VisibleForTesting + protected Collection<InetSocketAddress> getSCMAddressList() { + Collection<String> scmAddressList = + conf.getTrimmedStringCollection(OZONE_SCM_NAMES); + Collection<InetSocketAddress> resultList = new ArrayList<>(); + if (!scmAddressList.isEmpty()) { + final int port = getPortNumberFromConfigKeys(conf, + ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY) + .orElse(ScmConfigKeys.OZONE_SCM_BLOCK_CLIENT_PORT_DEFAULT); + for (String scmAddress : scmAddressList) { + LOG.info("SCM Address for proxy is {}", scmAddress); + + Optional<String> hostname = getHostName(scmAddress); + if (hostname.isPresent()) { + resultList.add(NetUtils.createSocketAddr( + hostname.get() + ":" + port)); + } + } + } + if (resultList.isEmpty()) { + // fall back + resultList.add(getScmAddressForBlockClients(conf)); + } + return resultList; + } + + private void loadConfigs() { + Collection<InetSocketAddress> scmAddressList = getSCMAddressList(); + int scmNodeIndex = 1; + for (InetSocketAddress scmAddress : scmAddressList) { + String nodeId = SCM_DUMMY_NODEID_PREFIX + scmNodeIndex; + if (scmAddress == null) { + LOG.error("Failed to create SCM proxy for {}.", nodeId); + continue; + } + scmNodeIndex++; + SCMProxyInfo scmProxyInfo = new SCMProxyInfo( + scmServiceId, nodeId, scmAddress); + ProxyInfo<ScmBlockLocationProtocolPB> proxy = new ProxyInfo<>( + null, scmProxyInfo.toString()); + scmProxies.put(nodeId, proxy); + scmProxyInfoMap.put(nodeId, scmProxyInfo); + scmNodeIDList.add(nodeId); + } + + if (scmProxies.isEmpty()) { + throw new IllegalArgumentException("Could not find any configured " + + "addresses for SCM. Please configure the system with " + + OZONE_SCM_NAMES); + } + } + + @VisibleForTesting + public synchronized String getCurrentProxyOMNodeId() { + return currentProxySCMNodeId; + } + + @Override + public synchronized ProxyInfo getProxy() { + ProxyInfo currentProxyInfo = scmProxies.get(currentProxySCMNodeId); + createSCMProxyIfNeeded(currentProxyInfo, currentProxySCMNodeId); + return currentProxyInfo; + } + + @Override + public void performFailover(ScmBlockLocationProtocolPB newLeader) { + // Should do nothing here. + LOG.debug("Failing over to next proxy. {}", getCurrentProxyOMNodeId()); + } + + public void performFailoverToAssignedLeader(String newLeader) { + if (newLeader == null) { + // If newLeader is not assigned, it will fail over to next proxy. + nextProxyIndex(); + } else { + if (!assignLeaderToNode(newLeader)) { + LOG.debug("Failing over OM proxy to nodeId: {}", newLeader); + nextProxyIndex(); + } + } + } + + @Override + public Class<ScmBlockLocationProtocolPB> getInterface() { + return ScmBlockLocationProtocolPB.class; + } + + @Override + public synchronized void close() throws IOException { + for (ProxyInfo<ScmBlockLocationProtocolPB> proxy : scmProxies.values()) { + ScmBlockLocationProtocolPB scmProxy = proxy.proxy; + if (scmProxy != null) { + RPC.stopProxy(scmProxy); + } + } + } + + public RetryAction getRetryAction(int failovers) { + if (failovers < maxRetryCount) { + return new RetryAction(RetryAction.RetryDecision.FAILOVER_AND_RETRY, + getRetryInterval()); + } else { + return RetryAction.FAIL; + } + } + + private synchronized long getRetryInterval() { + // TODO add exponential backup + return retryInterval; + } + + private synchronized int nextProxyIndex() { + lastAttemptedLeader = currentProxySCMNodeId; + + // round robin the next proxy + currentProxyIndex = (currentProxyIndex + 1) % scmProxies.size(); + currentProxySCMNodeId = scmNodeIDList.get(currentProxyIndex); + return currentProxyIndex; + } + + private synchronized boolean assignLeaderToNode(String newLeaderNodeId) { + if (!currentProxySCMNodeId.equals(newLeaderNodeId)) { + if (scmProxies.containsKey(newLeaderNodeId)) { + lastAttemptedLeader = currentProxySCMNodeId; + currentProxySCMNodeId = newLeaderNodeId; + currentProxyIndex = scmNodeIDList.indexOf(currentProxySCMNodeId); + return true; + } + } else { + lastAttemptedLeader = currentProxySCMNodeId; + } + return false; + } + + /** + * Creates proxy object if it does not already exist. + */ + private void createSCMProxyIfNeeded(ProxyInfo proxyInfo, + String nodeId) { + if (proxyInfo.proxy == null) { + InetSocketAddress address = scmProxyInfoMap.get(nodeId).getAddress(); + try { + ScmBlockLocationProtocolPB proxy = createSCMProxy(address); + try { + proxyInfo.proxy = proxy; + } catch (IllegalAccessError iae) { + scmProxies.put(nodeId, + new ProxyInfo<>(proxy, proxyInfo.proxyInfo)); + } + } catch (IOException ioe) { + LOG.error("{} Failed to create RPC proxy to SCM at {}", + this.getClass().getSimpleName(), address, ioe); + throw new RuntimeException(ioe); + } + } + } + + private ScmBlockLocationProtocolPB createSCMProxy( + InetSocketAddress scmAddress) throws IOException { + Configuration hadoopConf = + LegacyHadoopConfigurationSource.asHadoopConfiguration(conf); + RPC.setProtocolEngine(hadoopConf, ScmBlockLocationProtocol.class, + ProtobufRpcEngine.class); + return RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion, + scmAddress, UserGroupInformation.getCurrentUser(), hadoopConf, + NetUtils.getDefaultSocketFactory(hadoopConf), + (int)conf.getObject(SCMClientConfig.class).getRpcTimeOut()); + } + + public RetryPolicy getSCMBlockLocationRetryPolicy(String newLeader) { + RetryPolicy retryPolicy = new RetryPolicy() { + @Override + public RetryAction shouldRetry(Exception e, int retry, + int failover, boolean b) { + performFailoverToAssignedLeader(newLeader); + return getRetryAction(failover); + } + }; + return retryPolicy; + } +} + diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java new file mode 100644 index 0000000..99dc446 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMClientConfig.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import org.apache.hadoop.hdds.conf.Config; +import org.apache.hadoop.hdds.conf.ConfigGroup; +import org.apache.hadoop.hdds.conf.ConfigType; + +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.conf.ConfigTag.CLIENT; +import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE; +import static org.apache.hadoop.hdds.conf.ConfigTag.SCM; + +/** + * Config for SCM Block Client. + */ +@ConfigGroup(prefix = "hdds.scmclient") +public class SCMClientConfig { + public static final String SCM_CLIENT_RPC_TIME_OUT = "rpc.timeout"; + public static final String SCM_CLIENT_FAILOVER_MAX_RETRY = + "failover.max.retry"; + public static final String SCM_CLIENT_RETRY_INTERVAL = + "failover.retry.interval"; + + @Config(key = SCM_CLIENT_RPC_TIME_OUT, + defaultValue = "15m", + type = ConfigType.TIME, + tags = {OZONE, SCM, CLIENT}, + timeUnit = TimeUnit.MILLISECONDS, + description = "RpcClient timeout on waiting for the response from " + + "SCM. The default value is set to 15 minutes. " + + "If ipc.client.ping is set to true and this rpc-timeout " + + "is greater than the value of ipc.ping.interval, the effective " + + "value of the rpc-timeout is rounded up to multiple of " + + "ipc.ping.interval." + ) + private long rpcTimeOut = 15 * 60 * 1000; + + @Config(key = SCM_CLIENT_FAILOVER_MAX_RETRY, + defaultValue = "15", + type = ConfigType.INT, + tags = {OZONE, SCM, CLIENT}, + description = "Max retry count for SCM Client when failover happens." + ) + private int retryCount = 15; + + @Config(key = SCM_CLIENT_RETRY_INTERVAL, + defaultValue = "2s", + type = ConfigType.TIME, + tags = {OZONE, SCM, CLIENT}, + timeUnit = TimeUnit.MILLISECONDS, + description = "SCM Client timeout on waiting for the next connection " + + "retry to other SCM IP. The default value is set to 2 minutes. " + ) + private long retryInterval = 2 * 1000; + + public long getRpcTimeOut() { + return rpcTimeOut; + } + + public void setRpcTimeOut(long timeOut) { + // As at the end this value should not exceed MAX_VALUE, as underlying + // Rpc layer SocketTimeout parameter is int. + if (rpcTimeOut > Integer.MAX_VALUE) { + this.rpcTimeOut = Integer.MAX_VALUE; + } + this.rpcTimeOut = timeOut; + } + + public int getRetryCount() { + return retryCount; + } + + public void setRetryCount(int retryCount) { + this.retryCount = retryCount; + } + + public long getRetryInterval() { + return retryInterval; + } + + public void setRetryInterval(long retryInterval) { + this.retryInterval = retryInterval; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java new file mode 100644 index 0000000..ec2a5b0 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/SCMProxyInfo.java @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.proxy; + +import com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetSocketAddress; + +/** + * Class to store SCM proxy info. + */ +public class SCMProxyInfo { + private String serviceId; + private String nodeId; + private String rpcAddrStr; + private InetSocketAddress rpcAddr; + + private static final Logger LOG = + LoggerFactory.getLogger(SCMProxyInfo.class); + + public SCMProxyInfo(String serviceID, String nodeID, + InetSocketAddress rpcAddress) { + Preconditions.checkNotNull(rpcAddress); + this.serviceId = serviceID; + this.nodeId = nodeID; + this.rpcAddrStr = rpcAddress.toString(); + this.rpcAddr = rpcAddress; + if (rpcAddr.isUnresolved()) { + LOG.warn("SCM address {} for serviceID {} remains unresolved " + + "for node ID {} Check your ozone-site.xml file to ensure scm " + + "addresses are configured properly.", + rpcAddress, serviceId, nodeId); + } + } + + public String toString() { + return new StringBuilder() + .append("nodeId=") + .append(nodeId) + .append(",nodeAddress=") + .append(rpcAddrStr).toString(); + } + + public InetSocketAddress getAddress() { + return rpcAddr; + } + + public String getServiceId() { + return serviceId; + } + + public String getNodeId() { + return nodeId; + } +} diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java new file mode 100644 index 0000000..e3bb058 --- /dev/null +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/proxy/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.proxy; + +/** + * This package contains classes related to scm proxy. + */ diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto index fc7a598..06f9c31 100644 --- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto +++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto @@ -70,6 +70,8 @@ message SCMBlockLocationResponse { optional string leaderOMNodeId = 6; + optional string leaderSCMNodeId = 7; + optional AllocateScmBlockResponseProto allocateScmBlockResponse = 11; optional DeleteScmKeyBlocksResponseProto deleteScmKeyBlocksResponse = 12; optional hadoop.hdds.GetScmInfoResponseProto getScmInfoResponse = 13; @@ -114,6 +116,7 @@ enum Status { FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY = 26; FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27; INTERNAL_ERROR = 29; + SCM_NOT_LEADER = 30; } /** diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java index fb07351..eec0718 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer; import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher; import org.apache.hadoop.ozone.common.BlockGroup; import org.apache.hadoop.ozone.common.DeleteBlockGroupResult; @@ -94,9 +95,26 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB .setTraceID(traceID); } + private boolean isLeader() throws ServiceException { + if (!(impl instanceof SCMBlockProtocolServer)) { + throw new ServiceException("Should be SCMBlockProtocolServer"); + } else { + return ((SCMBlockProtocolServer) impl).getScm().checkLeader(); + } + } + @Override public SCMBlockLocationResponse send(RpcController controller, SCMBlockLocationRequest request) throws ServiceException { + if (!isLeader()) { + SCMBlockLocationResponse.Builder response = createSCMBlockResponse( + request.getCmdType(), + request.getTraceID()); + response.setSuccess(false); + response.setStatus(Status.SCM_NOT_LEADER); + response.setLeaderSCMNodeId(null); + return response.build(); + } return dispatcher.processRequest( request, this::processMessage, diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 99f873f..e334b73 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -297,6 +297,10 @@ public class SCMBlockProtocolServer implements } } + public StorageContainerManager getScm() { + return scm; + } + @Override public List<DatanodeDetails> sortDatanodes(List<String> nodes, String clientMachine) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 768ca09..44e133a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -1027,6 +1027,25 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl return replicationManager; } + /** + * Check if the current scm is the leader. + * @return - if the current scm is the leader. + */ + public boolean checkLeader() { + return scmHAManager.isLeader(); + } + + /** + * Get suggested leader from Raft. + * @return - suggested leader address. + */ + public String getSuggestedLeader() { + if (scmHAManager.getSuggestedLeader() == null) { + return null; + } + return scmHAManager.getSuggestedLeader().getAddress(); + } + public void checkAdminAccess(String remoteUser) throws IOException { if (remoteUser != null && !scmAdminUsernames.contains(remoteUser)) { throw new IOException( diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java index 01340cd..3129dee 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java @@ -70,6 +70,7 @@ import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolClientSideT import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB; import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB; +import org.apache.hadoop.hdds.scm.proxy.SCMBlockLocationFailoverProxyProvider; import org.apache.hadoop.hdds.security.x509.SecurityConfig; import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient; import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient; @@ -186,7 +187,6 @@ import org.apache.commons.lang3.StringUtils; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED; import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED_DEFAULT; -import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForBlockClients; import static org.apache.hadoop.hdds.HddsUtils.getScmAddressForClients; import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString; import static org.apache.hadoop.hdds.server.ServerUtils.getRemoteUserName; @@ -807,16 +807,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl OzoneConfiguration conf) throws IOException { RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class, ProtobufRpcEngine.class); - long scmVersion = - RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class); - InetSocketAddress scmBlockAddress = - getScmAddressForBlockClients(conf); ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient = new ScmBlockLocationProtocolClientSideTranslatorPB( - RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion, - scmBlockAddress, UserGroupInformation.getCurrentUser(), conf, - NetUtils.getDefaultSocketFactory(conf), - Client.getRpcTimeout(conf))); + new SCMBlockLocationFailoverProxyProvider(conf)); return TracingUtil .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class, conf); --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org