This is an automated email from the ASF dual-hosted git repository. licheng pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 8c25beb6a03be702f29e576e428a93d67ae0aafe Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Fri Mar 27 17:22:40 2020 +0800 HDDS-3185 Construct a standalone ratis server for SCM. (#720) Contributed-by: Li Cheng <timmych...@tencent.com> --- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 87 ++++ .../java/org/apache/hadoop/ozone/OzoneConsts.java | 5 +- .../common/src/main/resources/ozone-default.xml | 180 ++++++++ .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 37 ++ .../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java | 169 ++++++++ .../apache/hadoop/hdds/scm/ha/package-info.java | 22 + .../hadoop/hdds/scm/ratis/SCMRatisServer.java | 461 +++++++++++++++++++++ .../hadoop/hdds/scm/ratis/SCMStateMachine.java | 35 ++ .../hdds/scm/server/StorageContainerManager.java | 29 +- .../hadoop/hdds/scm/ratis/TestSCMRatisServer.java | 158 +++++++ .../hadoop/ozone/TestOzoneConfigurationFields.java | 3 + 11 files changed, 1183 insertions(+), 3 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java index c6b1100..37a1833 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java @@ -274,6 +274,16 @@ public final class ScmConfigKeys { // able to send back a new list to the datanodes. public static final String OZONE_SCM_NAMES = "ozone.scm.names"; + public static final String OZONE_SCM_INTERNAL_SERVICE_ID = + "ozone.scm.internal.service.id"; + + public static final String OZONE_SCM_SERVICE_IDS_KEY = + "ozone.scm.service.ids"; + public static final String OZONE_SCM_NODES_KEY = + "ozone.scm.nodes"; + public static final String OZONE_SCM_NODE_ID_KEY = + "ozone.scm.node.id"; + public static final int OZONE_SCM_DEFAULT_PORT = OZONE_SCM_DATANODE_PORT_DEFAULT; // The path where datanode ID is to be written to. @@ -357,6 +367,83 @@ public final class ScmConfigKeys { public static final String HDDS_TRACING_ENABLED = "hdds.tracing.enabled"; public static final boolean HDDS_TRACING_ENABLED_DEFAULT = false; + // SCM Ratis related + public static final String OZONE_SCM_HA_ENABLE_KEY + = "ozone.scm.ratis.enable"; + public static final boolean OZONE_SCM_HA_ENABLE_DEFAULT + = false; + public static final String OZONE_SCM_RATIS_PORT_KEY + = "ozone.scm.ratis.port"; + public static final int OZONE_SCM_RATIS_PORT_DEFAULT + = 9864; + public static final String OZONE_SCM_RATIS_RPC_TYPE_KEY + = "ozone.scm.ratis.rpc.type"; + public static final String OZONE_SCM_RATIS_RPC_TYPE_DEFAULT + = "GRPC"; + + // SCM Ratis Log configurations + public static final String OZONE_SCM_RATIS_STORAGE_DIR + = "ozone.scm.ratis.storage.dir"; + public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_KEY + = "ozone.scm.ratis.segment.size"; + public static final String OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT + = "16KB"; + public static final String OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY + = "ozone.scm.ratis.segment.preallocated.size"; + public static final String OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT + = "16KB"; + + // SCM Ratis Log Appender configurations + public static final String + OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS = + "ozone.scm.ratis.log.appender.queue.num-elements"; + public static final int + OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT = 1024; + public static final String OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT = + "ozone.scm.ratis.log.appender.queue.byte-limit"; + public static final String + OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT = "32MB"; + public static final String OZONE_SCM_RATIS_LOG_PURGE_GAP = + "ozone.scm.ratis.log.purge.gap"; + public static final int OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT = 1000000; + + // SCM Ratis server configurations + public static final String OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY + = "ozone.scm.ratis.server.request.timeout"; + public static final TimeDuration + OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT + = TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS); + public static final String + OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY + = "ozone.scm.ratis.server.retry.cache.timeout"; + public static final TimeDuration + OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT + = TimeDuration.valueOf(600000, TimeUnit.MILLISECONDS); + public static final String OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY + = "ozone.scm.ratis.minimum.timeout"; + public static final TimeDuration OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT + = TimeDuration.valueOf(1, TimeUnit.SECONDS); + + // SCM Ratis Leader Election configurations + public static final String + OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY = + "ozone.scm.leader.election.minimum.timeout.duration"; + public static final TimeDuration + OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT = + TimeDuration.valueOf(1, TimeUnit.SECONDS); + public static final String OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY + = "ozone.scm.ratis.server.failure.timeout.duration"; + public static final TimeDuration + OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT + = TimeDuration.valueOf(120, TimeUnit.SECONDS); + + // SCM Leader server role check interval + public static final String OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY + = "ozone.scm.ratis.server.role.check.interval"; + public static final TimeDuration + OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT + = TimeDuration.valueOf(15, TimeUnit.SECONDS); + /** * Never constructed. */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index b31f80d..bb8e145 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -345,10 +345,11 @@ public final class OzoneConsts { public static final String GDPR_LENGTH = "length"; public static final String GDPR_SECRET = "secret"; public static final String GDPR_ALGORITHM = "algorithm"; - - + // Transaction Info public static final String TRANSACTION_INFO_KEY = "#TRANSACTIONINFO"; public static final String TRANSACTION_INFO_SPLIT_KEY = "#"; + // SCM HA + public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault"; } diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index 26bf2ba..8657939 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1906,6 +1906,186 @@ <tag>OZONE, HDDS, SECURITY</tag> <description>SCM security server port.</description> </property> + <property> + <name>ozone.scm.service.ids</name> + <value></value> + <tag>OZONE, SCM, HA</tag> + <description> + Comma-separated list of SCM service Ids. This property allows the client + to figure out quorum of OzoneManager address. + </description> + </property> + <property> + <name>ozone.scm.internal.service.id</name> + <value></value> + <tag>OZONE, SCM, HA</tag> + <description> + Service ID of the SCM. If this is not set fall back to + ozone.scm.service.ids to find the service ID it belongs to. + </description> + </property> + <property> + <name>ozone.scm.nodes.EXAMPLESCMSERVICEID</name> + <value></value> + <tag>OZONE, SCM, HA</tag> + <description> + Comma-separated list of SCM node Ids for a given SCM service ID (eg. + EXAMPLESCMSERVICEID). The SCM service ID should be the value (one of the + values if there are multiple) set for the parameter ozone.scm.service.ids. + + Unique identifiers for each SCM Node, delimited by commas. This will be + used by SCMs in HA setup to determine all the SCMs + belonging to the same SCM in the cluster. For example, if you + used “scmService1” as the SCM service ID previously, and you wanted to + use “scm1”, “scm2” and "scm3" as the individual IDs of the SCMs, + you would configure a property ozone.scm.nodes.scmService1, and its value + "scm1,scm2,scm3". + </description> + </property> + <property> + <name>ozone.scm.node.id</name> + <value></value> + <tag>OZONE, SCM, HA</tag> + <description> + The ID of this SCM node. If the SCM node ID is not configured it + is determined automatically by matching the local node's address + with the configured address. + + If node ID is not deterministic from the configuration, then it is set + to the scmId from the SCM version file. + </description> + </property> + <property> + <name>ozone.scm.ratis.enable</name> + <value>false</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>Property to enable or disable Ratis server on SCM. + Please note - this is a temporary property to disable SCM Ratis server. + </description> + </property> + + <property> + <name>ozone.scm.ratis.port</name> + <value>9872</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description> + The port number of the SCM's Ratis server. + </description> + </property> + + <property> + <name>ozone.scm.ratis.rpc.type</name> + <value>GRPC</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>Ratis supports different kinds of transports like netty, GRPC, + Hadoop RPC etc. This picks one of those for this cluster. + </description> + </property> + + <property> + <name>ozone.scm.ratis.storage.dir</name> + <value/> + <tag>OZONE, SCM, HA, RATIS, STORAGE</tag> + <description>This directory is used for storing SCM's Ratis metadata like + logs. If this is not set then default metadata dirs is used. A warning + will be logged if this not set. Ideally, this should be mapped to a + fast disk like an SSD. + If undefined, SCM ratis storage dir will fallback to ozone.metadata.dirs. + This fallback approach is not recommended for production environments. + </description> + </property> + + <property> + <name>ozone.scm.ratis.segment.size</name> + <value>16KB</value> + <tag>OZONE, SCM, HA, RATIS, PERFORMANCE</tag> + <description>The size of the raft segment used by Apache Ratis on SCM. + (16 KB by default) + </description> + </property> + + <property> + <name>ozone.scm.ratis.segment.preallocated.size</name> + <value>16KB</value> + <tag>OZONE, SCM, HA, RATIS, PERFORMANCE</tag> + <description>The size of the buffer which is preallocated for raft segment + used by Apache Ratis on SCM.(16 KB by default) + </description> + </property> + + <property> + <name>ozone.scm.ratis.log.appender.queue.num-elements</name> + <value>1024</value> + <tag>OZONE, DEBUG, SCM, HA, RATIS</tag> + <description>Number of operation pending with Raft's Log Worker. + </description> + </property> + <property> + <name>ozone.scm.ratis.log.appender.queue.byte-limit</name> + <value>32MB</value> + <tag>OZONE, DEBUG, SCM, HA, RATIS</tag> + <description>Byte limit for Raft's Log Worker queue. + </description> + </property> + <property> + <name>ozone.scm.ratis.log.purge.gap</name> + <value>1000000</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>The minimum gap between log indices for Raft server to purge + its log segments after taking snapshot. + </description> + </property> + <property> + <name>ozone.scm.ratis.server.request.timeout</name> + <value>3s</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>The timeout duration for SCM's ratis server request .</description> + </property> + + <property> + <name>ozone.scm.ratis.server.retry.cache.timeout</name> + <value>600000ms</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>Retry Cache entry timeout for SCM's ratis server.</description> + </property> + + <property> + <name>ozone.scm.ratis.minimum.timeout</name> + <value>1s</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>The minimum timeout duration for SCM's Ratis server rpc. + </description> + </property> + + <property> + <name>ozone.scm.leader.election.minimum.timeout.duration</name> + <value>1s</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>The minimum timeout duration for SCM ratis leader election. + Default is 1s. + </description> + </property> + + <property> + <name>ozone.scm.ratis.server.failure.timeout.duration</name> + <value>120s</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>The timeout duration for ratis server failure detection, + once the threshold has reached, the ratis state machine will be informed + about the failure in the ratis ring. + </description> + </property> + + <property> + <name>ozone.scm.ratis.server.role.check.interval</name> + <value>15s</value> + <tag>OZONE, SCM, HA, RATIS</tag> + <description>The interval between SCM leader performing a role + check on its ratis server. Ratis server informs SCM if it + loses the leader role. The scheduled check is an secondary + check to ensure that the leader role is updated periodically + .</description> + </property> <property> <name>hdds.metadata.dir</name> diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java new file mode 100644 index 0000000..c0364ad --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; + +/** + * Utility class used by SCM HA. + */ +public final class SCMHAUtils { + private SCMHAUtils() { + // not used + } + + // Check if SCM HA is enabled. + public static boolean isSCMHAEnabled(OzoneConfiguration conf) { + return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, + ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java new file mode 100644 index 0000000..8d66187 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java @@ -0,0 +1,169 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.ha; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.OzoneConsts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; + +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID; + +/** + * Construct SCM node details. + */ +public final class SCMNodeDetails { + private String scmServiceId; + private String scmNodeId; + private InetSocketAddress rpcAddress; + private int rpcPort; + private int ratisPort; + private String httpAddress; + private String httpsAddress; + + public static final Logger LOG = + LoggerFactory.getLogger(SCMNodeDetails.class); + + /** + * Constructs SCMNodeDetails object. + */ + private SCMNodeDetails(String serviceId, String nodeId, + InetSocketAddress rpcAddr, int rpcPort, int ratisPort, + String httpAddress, String httpsAddress) { + this.scmServiceId = serviceId; + this.scmNodeId = nodeId; + this.rpcAddress = rpcAddr; + this.rpcPort = rpcPort; + this.ratisPort = ratisPort; + this.httpAddress = httpAddress; + this.httpsAddress = httpsAddress; + } + + @Override + public String toString() { + return "SCMNodeDetails[" + + "scmServiceId=" + scmServiceId + + ", scmNodeId=" + scmNodeId + + ", rpcAddress=" + rpcAddress + + ", rpcPort=" + rpcPort + + ", ratisPort=" + ratisPort + + ", httpAddress=" + httpAddress + + ", httpsAddress=" + httpsAddress + + "]"; + } + + /** + * Builder class for SCMNodeDetails. + */ + public static class Builder { + private String scmServiceId; + private String scmNodeId; + private InetSocketAddress rpcAddress; + private int rpcPort; + private int ratisPort; + private String httpAddr; + private String httpsAddr; + + public Builder setRpcAddress(InetSocketAddress rpcAddr) { + this.rpcAddress = rpcAddr; + this.rpcPort = rpcAddress.getPort(); + return this; + } + + public Builder setRatisPort(int port) { + this.ratisPort = port; + return this; + } + + public Builder setSCMServiceId(String serviceId) { + this.scmServiceId = serviceId; + return this; + } + + public Builder setSCMNodeId(String nodeId) { + this.scmNodeId = nodeId; + return this; + } + + public Builder setHttpAddress(String httpAddress) { + this.httpAddr = httpAddress; + return this; + } + + public Builder setHttpsAddress(String httpsAddress) { + this.httpsAddr = httpsAddress; + return this; + } + + public SCMNodeDetails build() { + return new SCMNodeDetails(scmServiceId, scmNodeId, rpcAddress, rpcPort, + ratisPort, httpAddr, httpsAddr); + } + } + + public String getSCMServiceId() { + return scmServiceId; + } + + public String getSCMNodeId() { + return scmNodeId; + } + + public InetSocketAddress getRpcAddress() { + return rpcAddress; + } + + public InetAddress getAddress() { + return rpcAddress.getAddress(); + } + + public int getRatisPort() { + return ratisPort; + } + + public int getRpcPort() { + return rpcPort; + } + + public String getRpcAddressString() { + return NetUtils.getHostPortString(rpcAddress); + } + + public static SCMNodeDetails initStandAlone( + OzoneConfiguration conf) throws IOException { + String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID); + int ratisPort = conf.getInt( + ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT); + InetSocketAddress rpcAddress = new InetSocketAddress( + InetAddress.getLocalHost(), 0); + SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder() + .setRatisPort(ratisPort) + .setRpcAddress(rpcAddress) + .setSCMNodeId(localSCMServiceId) + .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT) + .build(); + return scmNodeDetails; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java new file mode 100644 index 0000000..06fe168 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/package-info.java @@ -0,0 +1,22 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.ha; + +/** + * This package contains classes related to SCM HA. + */ diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java new file mode 100644 index 0000000..af1e5c2 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java @@ -0,0 +1,461 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ratis; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.StorageUnit; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.ratis.RaftConfigKeys; +import org.apache.ratis.conf.RaftProperties; +import org.apache.ratis.grpc.GrpcConfigKeys; +import org.apache.ratis.netty.NettyConfigKeys; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.ClientId; +import org.apache.ratis.protocol.GroupInfoReply; +import org.apache.ratis.protocol.GroupInfoRequest; +import org.apache.ratis.protocol.RaftGroup; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.protocol.RaftPeer; +import org.apache.ratis.protocol.RaftPeerId; +import org.apache.ratis.rpc.RpcType; +import org.apache.ratis.rpc.SupportedRpcType; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; +import org.apache.ratis.util.LifeCycle; +import org.apache.ratis.util.SizeInBytes; +import org.apache.ratis.util.TimeDuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * Class for SCM Ratis Server. + */ +public final class SCMRatisServer { + private static final Logger LOG = LoggerFactory + .getLogger(SCMRatisServer.class); + + private final StorageContainerManager scm; + private final SCMStateMachine scmStateMachine; + + private final int port; + private final InetSocketAddress scmRatisAddress; + private final RaftServer server; + private final RaftGroupId raftGroupId; + private final RaftGroup raftGroup; + private final RaftPeerId raftPeerId; + + private final ClientId clientId = ClientId.randomId(); + private final ScheduledExecutorService scheduledRoleChecker; + private long roleCheckInitialDelayMs = 1000; // 1 second default + private long roleCheckIntervalMs; + private ReentrantReadWriteLock roleCheckLock = new ReentrantReadWriteLock(); + private Optional<RaftProtos.RaftPeerRole> cachedPeerRole = Optional.empty(); + private Optional<RaftPeerId> cachedLeaderPeerId = Optional.empty(); + + private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); + private static long nextCallId() { + return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; + } + + private SCMRatisServer(Configuration conf, + StorageContainerManager scm, + String raftGroupIdStr, RaftPeerId localRaftPeerId, + InetSocketAddress addr, List<RaftPeer> raftPeers) + throws IOException { + this.scm = scm; + this.scmRatisAddress = addr; + this.port = addr.getPort(); + RaftProperties serverProperties = newRaftProperties(conf); + + this.raftPeerId = localRaftPeerId; + this.raftGroupId = RaftGroupId.valueOf( + getRaftGroupIdFromOmServiceId(raftGroupIdStr)); + this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeers); + + StringBuilder raftPeersStr = new StringBuilder(); + for (RaftPeer peer : raftPeers) { + raftPeersStr.append(", ").append(peer.getAddress()); + } + LOG.info("Instantiating SCM Ratis server with GroupID: {} and " + + "Raft Peers: {}", raftGroupIdStr, raftPeersStr.toString().substring(2)); + this.scmStateMachine = getStateMachine(); + + this.server = RaftServer.newBuilder() + .setServerId(this.raftPeerId) + .setGroup(this.raftGroup) + .setProperties(serverProperties) + .setStateMachine(scmStateMachine) + .build(); + + // Run a scheduler to check and update the server role on the leader + // periodically + this.scheduledRoleChecker = Executors.newSingleThreadScheduledExecutor(); + this.scheduledRoleChecker.scheduleWithFixedDelay(new Runnable() { + @Override + public void run() { + // Run this check only on the leader OM + if (cachedPeerRole.isPresent() && + cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) { + updateServerRole(); + } + } + }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS); + } + + public static SCMRatisServer newSCMRatisServer( + Configuration conf, StorageContainerManager scm, + SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers) + throws IOException { + String scmServiceId = scmNodeDetails.getSCMServiceId(); + + String scmNodeId = scmNodeDetails.getSCMNodeId(); + RaftPeerId localRaftPeerId = RaftPeerId.getRaftPeerId(scmNodeId); + InetSocketAddress ratisAddr = new InetSocketAddress( + scmNodeDetails.getAddress(), scmNodeDetails.getRatisPort()); + + RaftPeer localRaftPeer = new RaftPeer(localRaftPeerId, ratisAddr); + + List<RaftPeer> raftPeers = new ArrayList<>(); + raftPeers.add(localRaftPeer); + + for (SCMNodeDetails peer : peers) { + String peerNodeId = peer.getSCMNodeId(); + InetSocketAddress peerRatisAddr = new InetSocketAddress( + peer.getAddress(), peer.getRatisPort()); + RaftPeerId raftPeerId = RaftPeerId.valueOf(peerNodeId); + RaftPeer raftPeer = new RaftPeer(raftPeerId, peerRatisAddr); + // Add other SCMs in Ratis ring + raftPeers.add(raftPeer); + } + + return new SCMRatisServer(conf, scm, scmServiceId, localRaftPeerId, + ratisAddr, raftPeers); + } + + private UUID getRaftGroupIdFromOmServiceId(String scmServiceId) { + return UUID.nameUUIDFromBytes(scmServiceId.getBytes( + StandardCharsets.UTF_8)); + } + + private SCMStateMachine getStateMachine() { + return new SCMStateMachine(this); + } + + private RaftProperties newRaftProperties(Configuration conf){ + final RaftProperties properties = new RaftProperties(); + // Set RPC type + final String rpcType = conf.get( + ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_RPC_TYPE_DEFAULT); + final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType); + RaftConfigKeys.Rpc.setType(properties, rpc); + // Set the ratis port number + if (rpc == SupportedRpcType.GRPC) { + GrpcConfigKeys.Server.setPort(properties, port); + } else if (rpc == SupportedRpcType.NETTY) { + NettyConfigKeys.Server.setPort(properties, port); + } + // Set Ratis storage directory + String storageDir = SCMRatisServer.getSCMRatisDirectory(conf); + RaftServerConfigKeys.setStorageDirs(properties, + Collections.singletonList(new File(storageDir))); + // Set RAFT segment size + final int raftSegmentSize = (int) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_SIZE_DEFAULT, + StorageUnit.BYTES); + RaftServerConfigKeys.Log.setSegmentSizeMax(properties, + SizeInBytes.valueOf(raftSegmentSize)); + // Set RAFT segment pre-allocated size + final int raftSegmentPreallocatedSize = (int) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT, + StorageUnit.BYTES); + int logAppenderQueueNumElements = conf.getInt( + ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS, + ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_NUM_ELEMENTS_DEFAULT); + final int logAppenderQueueByteLimit = (int) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT, + ScmConfigKeys.OZONE_SCM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT, + StorageUnit.BYTES); + RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties, + logAppenderQueueNumElements); + RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties, + SizeInBytes.valueOf(logAppenderQueueByteLimit)); + RaftServerConfigKeys.Log.setPreallocatedSize(properties, + SizeInBytes.valueOf(raftSegmentPreallocatedSize)); + RaftServerConfigKeys.Log.Appender.setInstallSnapshotEnabled(properties, + false); + final int logPurgeGap = conf.getInt( + ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP, + ScmConfigKeys.OZONE_SCM_RATIS_LOG_PURGE_GAP_DEFAULT); + RaftServerConfigKeys.Log.setPurgeGap(properties, logPurgeGap); + // For grpc set the maximum message size + // TODO: calculate the optimal max message size + GrpcConfigKeys.setMessageSizeMax(properties, + SizeInBytes.valueOf(logAppenderQueueByteLimit)); + + // Set the server request timeout + TimeUnit serverRequestTimeoutUnit = + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT.getUnit(); + long serverRequestTimeoutDuration = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_REQUEST_TIMEOUT_DEFAULT + .getDuration(), serverRequestTimeoutUnit); + final TimeDuration serverRequestTimeout = TimeDuration.valueOf( + serverRequestTimeoutDuration, serverRequestTimeoutUnit); + RaftServerConfigKeys.Rpc.setRequestTimeout(properties, + serverRequestTimeout); + // Set timeout for server retry cache entry + TimeUnit retryCacheTimeoutUnit = ScmConfigKeys + .OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT.getUnit(); + long retryCacheTimeoutDuration = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_RETRY_CACHE_TIMEOUT_DEFAULT + .getDuration(), retryCacheTimeoutUnit); + final TimeDuration retryCacheTimeout = TimeDuration.valueOf( + retryCacheTimeoutDuration, retryCacheTimeoutUnit); + RaftServerConfigKeys.RetryCache.setExpiryTime(properties, + retryCacheTimeout); + // Set the server min and max timeout + TimeUnit serverMinTimeoutUnit = + ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT.getUnit(); + long serverMinTimeoutDuration = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_MINIMUM_TIMEOUT_DEFAULT + .getDuration(), serverMinTimeoutUnit); + final TimeDuration serverMinTimeout = TimeDuration.valueOf( + serverMinTimeoutDuration, serverMinTimeoutUnit); + long serverMaxTimeoutDuration = + serverMinTimeout.toLong(TimeUnit.MILLISECONDS) + 200; + final TimeDuration serverMaxTimeout = TimeDuration.valueOf( + serverMaxTimeoutDuration, serverMinTimeoutUnit); + RaftServerConfigKeys.Rpc.setTimeoutMin(properties, + serverMinTimeout); + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + serverMaxTimeout); + // Set the number of maximum cached segments + RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2); + // TODO: set max write buffer size + // Set the ratis leader election timeout + TimeUnit leaderElectionMinTimeoutUnit = + ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT + .getUnit(); + long leaderElectionMinTimeoutduration = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT + .getDuration(), leaderElectionMinTimeoutUnit); + final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf( + leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit); + RaftServerConfigKeys.Rpc.setTimeoutMin(properties, + leaderElectionMinTimeout); + long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong( + TimeUnit.MILLISECONDS) + 200; + RaftServerConfigKeys.Rpc.setTimeoutMax(properties, + TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS)); + TimeUnit nodeFailureTimeoutUnit = + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT + .getUnit(); + long nodeFailureTimeoutDuration = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT + .getDuration(), nodeFailureTimeoutUnit); + final TimeDuration nodeFailureTimeout = TimeDuration.valueOf( + nodeFailureTimeoutDuration, nodeFailureTimeoutUnit); + RaftServerConfigKeys.Notification.setNoLeaderTimeout(properties, + nodeFailureTimeout); + RaftServerConfigKeys.Rpc.setSlownessTimeout(properties, + nodeFailureTimeout); + + // Ratis leader role check + TimeUnit roleCheckIntervalUnit = + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT + .getUnit(); + long roleCheckIntervalDuration = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_SERVER_ROLE_CHECK_INTERVAL_DEFAULT + .getDuration(), nodeFailureTimeoutUnit); + this.roleCheckIntervalMs = TimeDuration.valueOf( + roleCheckIntervalDuration, roleCheckIntervalUnit) + .toLong(TimeUnit.MILLISECONDS); + this.roleCheckInitialDelayMs = leaderElectionMinTimeout + .toLong(TimeUnit.MILLISECONDS); + + return properties; + } + + /** + * Start the Ratis server. + * @throws IOException + */ + public void start() throws IOException { + LOG.info("Starting {} {} at port {}", getClass().getSimpleName(), + server.getId(), port); + server.start(); + } + + /** + * Stop the Ratis server. + */ + public void stop() { + try { + server.close(); + scmStateMachine.stop(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + private boolean checkCachedPeerRoleIsLeader() { + this.roleCheckLock.readLock().lock(); + try { + if (cachedPeerRole.isPresent() && + cachedPeerRole.get() == RaftProtos.RaftPeerRole.LEADER) { + return true; + } + return false; + } finally { + this.roleCheckLock.readLock().unlock(); + } + } + + public boolean isLeader() { + if (checkCachedPeerRoleIsLeader()) { + return true; + } + + // Get the server role from ratis server and update the cached values. + updateServerRole(); + + // After updating the server role, check and return if leader or not. + return checkCachedPeerRoleIsLeader(); + } + + @VisibleForTesting + public LifeCycle.State getServerState() { + return server.getLifeCycleState(); + } + + @VisibleForTesting + public RaftPeerId getRaftPeerId() { + return this.raftPeerId; + } + + public RaftGroup getRaftGroup() { + return this.raftGroup; + } + + /** + * Get the local directory where ratis logs will be stored. + */ + public static String getSCMRatisDirectory(Configuration conf) { + String storageDir = conf.get(ScmConfigKeys.OZONE_SCM_RATIS_STORAGE_DIR); + + if (Strings.isNullOrEmpty(storageDir)) { + storageDir = ServerUtils.getDefaultRatisDirectory(conf); + } + return storageDir; + } + + public Optional<RaftPeerId> getCachedLeaderPeerId() { + this.roleCheckLock.readLock().lock(); + try { + return cachedLeaderPeerId; + } finally { + this.roleCheckLock.readLock().unlock(); + } + } + + public int getServerPort() { + return port; + } + + public void updateServerRole() { + try { + GroupInfoReply groupInfo = getGroupInfo(); + RaftProtos.RoleInfoProto roleInfoProto = groupInfo.getRoleInfoProto(); + RaftProtos.RaftPeerRole thisNodeRole = roleInfoProto.getRole(); + + if (thisNodeRole.equals(RaftProtos.RaftPeerRole.LEADER)) { + setServerRole(thisNodeRole, raftPeerId); + + } else if (thisNodeRole.equals(RaftProtos.RaftPeerRole.FOLLOWER)) { + ByteString leaderNodeId = roleInfoProto.getFollowerInfo() + .getLeaderInfo().getId().getId(); + // There may be a chance, here we get leaderNodeId as null. For + // example, in 3 node OM Ratis, if 2 SCM nodes are down, there will + // be no leader. + RaftPeerId leaderPeerId = null; + if (leaderNodeId != null && !leaderNodeId.isEmpty()) { + leaderPeerId = RaftPeerId.valueOf(leaderNodeId); + } + + setServerRole(thisNodeRole, leaderPeerId); + + } else { + setServerRole(thisNodeRole, null); + + } + } catch (IOException e) { + LOG.error("Failed to retrieve RaftPeerRole. Setting cached role to " + + "{} and resetting leader info.", + RaftProtos.RaftPeerRole.UNRECOGNIZED, e); + setServerRole(null, null); + } + } + + private GroupInfoReply getGroupInfo() throws IOException { + GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId, + raftPeerId, raftGroupId, nextCallId()); + GroupInfoReply groupInfo = server.getGroupInfo(groupInfoRequest); + return groupInfo; + } + + private void setServerRole(RaftProtos.RaftPeerRole currentRole, + RaftPeerId leaderPeerId) { + this.roleCheckLock.writeLock().lock(); + try { + this.cachedPeerRole = Optional.ofNullable(currentRole); + this.cachedLeaderPeerId = Optional.ofNullable(leaderPeerId); + } finally { + this.roleCheckLock.writeLock().unlock(); + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java new file mode 100644 index 0000000..502260a --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ratis; + +import org.apache.ratis.statemachine.impl.BaseStateMachine; + +/** + * Class for SCM StateMachine. + */ +public class SCMStateMachine extends BaseStateMachine { + //TODO to be implemented + public SCMStateMachine(SCMRatisServer ratisServer) { + + } + + public void stop() { + return; + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index a6cd5d4..6f978bd 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -30,6 +30,7 @@ import java.security.cert.X509Certificate; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.Collections; import java.util.Objects; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; @@ -42,6 +43,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; +import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; +import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer; +import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.block.BlockManager; @@ -90,7 +95,6 @@ import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec; import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.EventQueue; -import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.utils.HddsVersionInfo; import org.apache.hadoop.hdds.utils.LegacyHadoopConfigurationSource; import org.apache.hadoop.hdfs.DFSUtil; @@ -189,6 +193,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl private CertificateServer certificateServer; private GrpcTlsConfig grpcTlsConfig; + // SCM HA related + private SCMRatisServer scmRatisServer; + private JvmPauseMonitor jvmPauseMonitor; private final OzoneConfiguration configuration; private SCMContainerMetrics scmContainerMetrics; @@ -256,6 +263,12 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl loginAsSCMUser(conf); } + if (SCMHAUtils.isSCMHAEnabled(conf)) { + initializeRatisServer(); + } else { + scmRatisServer = null; + } + // Creates the SCM DBs or opens them if it exists. // A valid pointer to the store is required by all the other services below. initalizeMetadataStore(conf, configurator); @@ -1110,4 +1123,18 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl public NetworkTopology getClusterMap() { return this.clusterMap; } + + private void initializeRatisServer() throws IOException { + if (scmRatisServer == null) { + SCMNodeDetails scmNodeDetails = SCMNodeDetails + .initStandAlone(configuration); + //TODO enable Ratis ring + scmRatisServer = SCMRatisServer.newSCMRatisServer(configuration, this, + scmNodeDetails, Collections.EMPTY_LIST); + if (scmRatisServer != null) { + LOG.info("SCM Ratis server initialized at port {}", + scmRatisServer.getServerPort()); + } + } + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java new file mode 100644 index 0000000..f29fb5f --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.ratis; + +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; +import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.util.LifeCycle; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + +/** + * Test class for SCM Ratis Server. + */ +public class TestSCMRatisServer { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private OzoneConfiguration conf; + private SCMRatisServer scmRatisServer; + private StorageContainerManager scm; + private String scmId; + private SCMNodeDetails scmNodeDetails; + private static final long LEADER_ELECTION_TIMEOUT = 500L; + + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + scmId = UUID.randomUUID().toString(); + conf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + int ratisPort = conf.getInt( + ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, + ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT); + InetSocketAddress rpcAddress = new InetSocketAddress( + InetAddress.getLocalHost(), 0); + scmNodeDetails = new SCMNodeDetails.Builder() + .setRatisPort(ratisPort) + .setRpcAddress(rpcAddress) + .setSCMNodeId(scmId) + .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT) + .build(); + + // Standalone SCM Ratis server + initSCM(); + scm = HddsTestUtils.getScm(conf); + scm.start(); + scmRatisServer = SCMRatisServer.newSCMRatisServer( + conf, scm, scmNodeDetails, Collections.EMPTY_LIST); + scmRatisServer.start(); + } + + @After + public void shutdown() { + if (scmRatisServer != null) { + scmRatisServer.stop(); + } + if (scm != null) { + scm.stop(); + } + } + + @Test + public void testStartSCMRatisServer() throws Exception { + Assert.assertEquals("Ratis Server should be in running state", + LifeCycle.State.RUNNING, scmRatisServer.getServerState()); + } + + @Test + public void verifyRaftGroupIdGenerationWithCustomOmServiceId() throws + Exception { + String customScmServiceId = "scmIdCustom123"; + OzoneConfiguration newConf = new OzoneConfiguration(); + String newOmId = UUID.randomUUID().toString(); + String path = GenericTestUtils.getTempPath(newOmId); + Path metaDirPath = Paths.get(path, "scm-meta"); + newConf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString()); + newConf.setTimeDuration( + ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, + LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); + int ratisPort = 9873; + InetSocketAddress rpcAddress = new InetSocketAddress( + InetAddress.getLocalHost(), 0); + SCMNodeDetails nodeDetails = new SCMNodeDetails.Builder() + .setRpcAddress(rpcAddress) + .setRatisPort(ratisPort) + .setSCMNodeId(newOmId) + .setSCMServiceId(customScmServiceId) + .build(); + // Starts a single node Ratis server + scmRatisServer.stop(); + SCMRatisServer newScmRatisServer = SCMRatisServer + .newSCMRatisServer(newConf, scm, nodeDetails, + Collections.emptyList()); + newScmRatisServer.start(); + + UUID uuid = UUID.nameUUIDFromBytes(customScmServiceId.getBytes()); + RaftGroupId raftGroupId = newScmRatisServer.getRaftGroup().getGroupId(); + Assert.assertEquals(uuid, raftGroupId.getUuid()); + Assert.assertEquals(raftGroupId.toByteString().size(), 16); + newScmRatisServer.stop(); + } + + private void initSCM() throws IOException { + String clusterId = UUID.randomUUID().toString(); + scmId = UUID.randomUUID().toString(); + + final String path = folder.newFolder().toString(); + Path scmPath = Paths.get(path, "scm-meta"); + Files.createDirectories(scmPath); + conf.set(OZONE_METADATA_DIRS, scmPath.toString()); + SCMStorageConfig scmStore = new SCMStorageConfig(conf); + scmStore.setClusterId(clusterId); + scmStore.setScmId(scmId); + // writes the version file properties + scmStore.initialize(); + } +} diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index b9ae815..41f68b6 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@ -54,6 +54,8 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { errorIfMissingXmlProps = true; xmlPropsToSkipCompare.add("hadoop.tags.custom"); xmlPropsToSkipCompare.add("ozone.om.nodes.EXAMPLEOMSERVICEID"); + xmlPropsToSkipCompare.add("ozone.scm.nodes.EXAMPLESCMSERVICEID"); + xmlPrefixToSkipCompare.add("ipc.client.rpc-timeout.ms"); addPropertiesNotInXml(); } @@ -65,6 +67,7 @@ public class TestOzoneConfigurationFields extends TestConfigurationFieldsBase { HddsConfigKeys.HDDS_SECURITY_PROVIDER, HddsConfigKeys.HDDS_X509_CRL_NAME, // HDDS-2873 OMConfigKeys.OZONE_OM_NODES_KEY, + ScmConfigKeys.OZONE_SCM_NODES_KEY, OzoneConfigKeys.OZONE_ACL_AUTHORIZER_CLASS_NATIVE, OzoneConfigKeys.OZONE_S3_AUTHINFO_MAX_LIFETIME_KEY, ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org