This is an automated email from the ASF dual-hosted git repository. licheng pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push: new 7ddaa07 HDDS-4192: enable SCM Raft Group based on config ozone.scm.names (#1428) 7ddaa07 is described below commit 7ddaa07d7de696c71113670c2f092cbc14f06658 Author: GlenGeng <gleng...@tencent.com> AuthorDate: Sat Oct 10 11:28:06 2020 +0800 HDDS-4192: enable SCM Raft Group based on config ozone.scm.names (#1428) * HDDS-4192: enable SCM Raft Group based on config ozone.scm.names * HDDS-4192: fix comments --- .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 116 +++++++++++++++++++-- 1 file changed, 106 insertions(+), 10 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java index 33ae109..8611b1f 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMRatisServerImpl.java @@ -18,17 +18,22 @@ package org.apache.hadoop.hdds.scm.ha; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicLong; +import java.util.stream.Collectors; +import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol.RequestType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.protocol.ClientId; import org.apache.ratis.protocol.RaftClientReply; @@ -38,11 +43,15 @@ import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftPeer; import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.server.RaftServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * TODO. */ public class SCMRatisServerImpl implements SCMRatisServer { + private static final Logger LOG = + LoggerFactory.getLogger(SCMRatisServerImpl.class); private final InetSocketAddress address; private final RaftServer server; @@ -53,24 +62,20 @@ public class SCMRatisServerImpl implements SCMRatisServer { private final ClientId clientId = ClientId.randomId(); private final AtomicLong callId = new AtomicLong(); - // TODO: Refactor and remove ConfigurationSource and use only // SCMHAConfiguration. SCMRatisServerImpl(final SCMHAConfiguration haConf, final ConfigurationSource conf) throws IOException { - final String scmServiceId = "SCM-HA-Service"; - final String scmNodeId = "localhost"; - this.raftPeerId = RaftPeerId.getRaftPeerId(scmNodeId); this.address = haConf.getRatisBindAddress(); - final RaftPeer localRaftPeer = new RaftPeer(raftPeerId, address); - final List<RaftPeer> raftPeers = new ArrayList<>(); - raftPeers.add(localRaftPeer); + + SCMHAGroupBuilder scmHAGroupBuilder = new SCMHAGroupBuilder(haConf, conf); + this.raftPeerId = scmHAGroupBuilder.getPeerId(); + this.raftGroupId = scmHAGroupBuilder.getRaftGroupId(); + this.raftGroup = scmHAGroupBuilder.getRaftGroup(); + final RaftProperties serverProperties = RatisUtil .newRaftProperties(haConf, conf); - this.raftGroupId = RaftGroupId.valueOf( - UUID.nameUUIDFromBytes(scmServiceId.getBytes(StandardCharsets.UTF_8))); - this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); this.scmStateMachine = new SCMStateMachine(); this.server = RaftServer.newBuilder() .setServerId(raftPeerId) @@ -125,4 +130,95 @@ public class SCMRatisServerImpl implements SCMRatisServer { public List<RaftPeer> getRaftPeers() { return Collections.singletonList(new RaftPeer(raftPeerId)); } + + + /** + * If the SCM group starts from {@link ScmConfigKeys#OZONE_SCM_NAMES}, + * its raft peers should locate on different nodes, and use the same port + * to communicate with each other. + * + * Each of the raft peer figures out its {@link RaftPeerId} by computing + * its position in {@link ScmConfigKeys#OZONE_SCM_NAMES}. + * + * Assume {@link ScmConfigKeys#OZONE_SCM_NAMES} is "ip0,ip1,ip2", + * scm with ip0 identifies its {@link RaftPeerId} as scm0, + * scm with ip1 identifies its {@link RaftPeerId} as scm1, + * scm with ip2 identifies its {@link RaftPeerId} as scm2. + * + * After startup, they will form a {@link RaftGroup} with groupID + * "SCM-HA-Service", and communicate with each other via + * ozone.scm.ha.ratis.bind.port. + */ + private static class SCMHAGroupBuilder { + private final static String SCM_SERVICE_ID = "SCM-HA-Service"; + + private final RaftGroupId raftGroupId; + private final RaftGroup raftGroup; + private RaftPeerId selfPeerId; + + /** + * @return raft group + */ + public RaftGroup getRaftGroup() { + return raftGroup; + } + + /** + * @return raft group id + */ + public RaftGroupId getRaftGroupId() { + return raftGroupId; + } + + /** + * @return raft peer id + */ + public RaftPeerId getPeerId() { + return selfPeerId; + } + + SCMHAGroupBuilder(final SCMHAConfiguration haConf, + final ConfigurationSource conf) throws IOException { + // fetch port + int port = haConf.getRatisBindAddress().getPort(); + + // fetch localhost + InetAddress localHost = InetAddress.getLocalHost(); + + // fetch hosts from ozone.scm.names + List<String> hosts = + Arrays.stream(conf.getTrimmedStrings(ScmConfigKeys.OZONE_SCM_NAMES)) + .map(scmName -> HddsUtils.getHostName(scmName).get()) + .collect(Collectors.toList()); + + final List<RaftPeer> raftPeers = new ArrayList<>(); + for (int i = 0; i < hosts.size(); ++i) { + String nodeId = "scm" + i; + RaftPeerId peerId = RaftPeerId.getRaftPeerId(nodeId); + + String host = hosts.get(i); + if (InetAddress.getByName(host).equals(localHost)) { + selfPeerId = peerId; + } + + raftPeers.add(new RaftPeer(peerId, host + ":" + port)); + } + + if (selfPeerId == null) { + String errorMessage = "localhost " + localHost + + " does not exist in ozone.scm.names " + + conf.get(ScmConfigKeys.OZONE_SCM_NAMES); + throw new IOException(errorMessage); + } + + LOG.info("Build a RaftGroup for SCMHA, " + + "localHost: {}, OZONE_SCM_NAMES: {}, selfPeerId: {}", + localHost, conf.get(ScmConfigKeys.OZONE_SCM_NAMES), selfPeerId); + + raftGroupId = RaftGroupId.valueOf(UUID.nameUUIDFromBytes( + SCM_SERVICE_ID.getBytes(StandardCharsets.UTF_8))); + + raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org