This is an automated email from the ASF dual-hosted git repository. licheng pushed a commit to branch HDDS-2823 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 61a9bb49dcca4999b780108c61183258274ea0b8 Author: Li Cheng <bloodhell2...@gmail.com> AuthorDate: Fri Apr 17 15:39:03 2020 +0800 HDDS-3187 Construct SCM StateMachine. (#819) Co-authored-by: Li Cheng <timmych...@tencent.com> --- .../java/org/apache/hadoop/ozone/OzoneConsts.java | 5 + .../java/org/apache/hadoop/hdds/scm/ScmUtils.java | 12 ++ .../hdds/scm/container/ContainerManager.java | 5 + .../hdds/scm/container/SCMContainerManager.java | 21 +- .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 42 ++++ .../apache/hadoop/hdds/scm/ha/SCMNodeDetails.java | 18 +- .../hadoop/hdds/scm/ratis/SCMStateMachine.java | 35 --- .../hdds/scm/server/StorageContainerManager.java | 59 ++++- .../scm/{ => server}/ratis/SCMRatisServer.java | 25 ++- .../scm/server/ratis/SCMRatisSnapshotInfo.java | 179 +++++++++++++++ .../hdds/scm/server/ratis/SCMStateMachine.java | 240 +++++++++++++++++++++ .../hdds/scm/{ => server}/ratis/package-info.java | 2 +- .../scm/{ => server}/ratis/TestSCMRatisServer.java | 23 +- .../hdds/scm/server/ratis/TestSCMStateMachine.java | 120 +++++++++++ 14 files changed, 714 insertions(+), 72 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java index 8169766..1c41908 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java @@ -327,4 +327,9 @@ public final class OzoneConsts { // SCM HA public static final String SCM_SERVICE_ID_DEFAULT = "scmServiceIdDefault"; + + // SCM Ratis snapshot file to store the last applied index + public static final String SCM_RATIS_SNAPSHOT_INDEX = "scmRatisSnapshotIndex"; + + public static final String SCM_RATIS_SNAPSHOT_TERM = "scmRatisSnapshotTerm"; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java index 426341a..bb48654 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ScmUtils.java @@ -25,6 +25,8 @@ import org.apache.hadoop.hdds.scm.safemode.Precheck; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.File; + /** * SCM utility class. */ @@ -48,4 +50,14 @@ public final class ScmUtils { } } + /** + * Create SCM directory file based on given path. + */ + public static File createSCMDir(String dirPath) { + File dirFile = new File(dirPath); + if (!dirFile.mkdirs() && !dirFile.exists()) { + throw new IllegalArgumentException("Unable to create path: " + dirFile); + } + return dirFile; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java index 43c1ced..f17a2f4 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java @@ -189,4 +189,9 @@ public interface ContainerManager extends Closeable { * @param success */ void notifyContainerReportProcessing(boolean isFullReport, boolean success); + + /** + * Flush metadata of container manager if they are required to be persisted. + */ + void flushDB() throws IOException; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java index 9f47608..ee8c689 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java @@ -365,13 +365,20 @@ public class SCMContainerManager implements ContainerManager { } } - /** - * Update deleteTransactionId according to deleteTransactionMap. - * - * @param deleteTransactionMap Maps the containerId to latest delete - * transaction id for the container. - * @throws IOException - */ + @Override + public void flushDB() throws IOException { + if (containerStore != null) { + containerStore.flushDB(true); + } + } + + /** + * Update deleteTransactionId according to deleteTransactionMap. + * + * @param deleteTransactionMap Maps the containerId to latest delete + * transaction id for the container. + * @throws IOException + */ public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap) throws IOException { diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java index c0364ad..eb22566 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java @@ -18,8 +18,15 @@ package org.apache.hadoop.hdds.scm.ha; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.ScmUtils; +import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer; + +import java.io.File; +import java.util.Collection; /** * Utility class used by SCM HA. @@ -34,4 +41,39 @@ public final class SCMHAUtils { return conf.getBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, ScmConfigKeys.OZONE_SCM_HA_ENABLE_DEFAULT); } + + public static File createSCMRatisDir(OzoneConfiguration conf) + throws IllegalArgumentException { + String scmRatisDir = SCMRatisServer.getSCMRatisDirectory(conf); + if (scmRatisDir == null || scmRatisDir.isEmpty()) { + throw new IllegalArgumentException(HddsConfigKeys.OZONE_METADATA_DIRS + + " must be defined."); + } + return ScmUtils.createSCMDir(scmRatisDir); + } + + /** + * Get a collection of all scmNodeIds for the given scmServiceId. + */ + public static Collection<String> getSCMNodeIds(Configuration conf, + String scmServiceId) { + String key = addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId); + return conf.getTrimmedStringCollection(key); + } + + public static String getLocalSCMNodeId(String scmServiceId) { + return addSuffix(ScmConfigKeys.OZONE_SCM_NODES_KEY, scmServiceId); + } + + /** + * Add non empty and non null suffix to a key. + */ + private static String addSuffix(String key, String suffix) { + if (suffix == null || suffix.isEmpty()) { + return key; + } + assert !suffix.startsWith(".") : + "suffix '" + suffix + "' should not already have '.' prepended."; + return key + "." + suffix; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java index 8d66187..2390cb3 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMNodeDetails.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.ha; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.ozone.OzoneConsts; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -29,6 +28,7 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID; +import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_SERVICE_IDS_KEY; /** * Construct SCM node details. @@ -153,6 +153,18 @@ public final class SCMNodeDetails { public static SCMNodeDetails initStandAlone( OzoneConfiguration conf) throws IOException { String localSCMServiceId = conf.getTrimmed(OZONE_SCM_INTERNAL_SERVICE_ID); + if (localSCMServiceId == null) { + // There is no internal om service id is being set, fall back to ozone + // .om.service.ids. + LOG.info("{} is not defined, falling back to {} to find serviceID for " + + "SCM if it is HA enabled cluster", + OZONE_SCM_INTERNAL_SERVICE_ID, OZONE_SCM_SERVICE_IDS_KEY); + localSCMServiceId = conf.getTrimmed( + OZONE_SCM_SERVICE_IDS_KEY); + } else { + LOG.info("ServiceID for SCM is {}", localSCMServiceId); + } + String localSCMNodeId = SCMHAUtils.getLocalSCMNodeId(localSCMServiceId); int ratisPort = conf.getInt( ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT); @@ -161,8 +173,8 @@ public final class SCMNodeDetails { SCMNodeDetails scmNodeDetails = new SCMNodeDetails.Builder() .setRatisPort(ratisPort) .setRpcAddress(rpcAddress) - .setSCMNodeId(localSCMServiceId) - .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT) + .setSCMNodeId(localSCMNodeId) + .setSCMServiceId(localSCMServiceId) .build(); return scmNodeDetails; } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java deleted file mode 100644 index 502260a..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMStateMachine.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.ratis; - -import org.apache.ratis.statemachine.impl.BaseStateMachine; - -/** - * Class for SCM StateMachine. - */ -public class SCMStateMachine extends BaseStateMachine { - //TODO to be implemented - public SCMStateMachine(SCMRatisServer ratisServer) { - - } - - public void stop() { - return; - } -} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 6f978bd..b8527bc 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@ -25,6 +25,13 @@ import javax.management.ObjectName; import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.RemovalListener; +import com.google.protobuf.BlockingService; + +import java.io.File; import java.security.cert.CertificateException; import java.security.cert.X509Certificate; import java.util.Collection; @@ -45,7 +52,8 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState; import org.apache.hadoop.hdds.scm.PlacementPolicy; import org.apache.hadoop.hdds.scm.ha.SCMHAUtils; import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; -import org.apache.hadoop.hdds.scm.ratis.SCMRatisServer; +import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisServer; +import org.apache.hadoop.hdds.scm.server.ratis.SCMRatisSnapshotInfo; import org.apache.hadoop.hdds.utils.HddsServerUtil; import org.apache.hadoop.hdds.scm.ScmConfig; import org.apache.hadoop.hdds.scm.ScmConfigKeys; @@ -114,13 +122,9 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod; import org.apache.hadoop.security.authentication.client.AuthenticationException; import org.apache.hadoop.util.JvmPauseMonitor; -import com.google.common.annotations.VisibleForTesting; -import com.google.common.cache.Cache; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.RemovalListener; -import com.google.protobuf.BlockingService; import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT_DEFAULT; import org.apache.ratis.grpc.GrpcTlsConfig; +import org.apache.ratis.server.protocol.TermIndex; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -195,6 +199,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl // SCM HA related private SCMRatisServer scmRatisServer; + private SCMRatisSnapshotInfo scmRatisSnapshotInfo; + private File scmRatisSnapshotDir; private JvmPauseMonitor jvmPauseMonitor; private final OzoneConfiguration configuration; @@ -264,6 +270,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl } if (SCMHAUtils.isSCMHAEnabled(conf)) { + this.scmRatisSnapshotInfo = new SCMRatisSnapshotInfo( + scmStorageConfig.getCurrentDir()); + this.scmRatisSnapshotDir = SCMHAUtils.createSCMRatisDir(conf); initializeRatisServer(); } else { scmRatisServer = null; @@ -796,6 +805,10 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl getClientRpcAddress())); } + if (scmRatisServer != null) { + scmRatisServer.start(); + } + ms = HddsServerUtil .initializeMetrics(configuration, "StorageContainerManager"); @@ -1137,4 +1150,38 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl } } } + + @VisibleForTesting + public SCMRatisServer getScmRatisServer() { + return scmRatisServer; + } + + @VisibleForTesting + public SCMRatisSnapshotInfo getSnapshotInfo() { + return scmRatisSnapshotInfo; + } + + @VisibleForTesting + public long getRatisSnapshotIndex() { + return scmRatisSnapshotInfo.getIndex(); + } + + /** + * Save ratis snapshot to SCM meta store and local disk. + */ + public TermIndex saveRatisSnapshot() throws IOException { + TermIndex snapshotIndex = scmRatisServer.getLastAppliedTermIndex(); + if (scmMetadataStore != null) { + // Flush the SCM state to disk + scmMetadataStore.getStore().flush(); + } + + if (containerManager != null) { + containerManager.flushDB(); + } + + scmRatisSnapshotInfo.saveRatisSnapshotToDisk(snapshotIndex); + + return snapshotIndex; + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java similarity index 97% rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java index af1e5c2..77dee6a 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/SCMRatisServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisServer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hdds.scm.ratis; +package org.apache.hadoop.hdds.scm.server.ratis; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; @@ -42,6 +42,7 @@ import org.apache.ratis.rpc.RpcType; import org.apache.ratis.rpc.SupportedRpcType; import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServerConfigKeys; +import org.apache.ratis.server.protocol.TermIndex; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.util.LifeCycle; import org.apache.ratis.util.SizeInBytes; @@ -94,6 +95,10 @@ public final class SCMRatisServer { return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; } + /** + * Creates a SCM Ratis Server. + * @throws IOException + */ private SCMRatisServer(Configuration conf, StorageContainerManager scm, String raftGroupIdStr, RaftPeerId localRaftPeerId, @@ -139,6 +144,9 @@ public final class SCMRatisServer { }, roleCheckInitialDelayMs, roleCheckIntervalMs, TimeUnit.MILLISECONDS); } + /** + * Create a SCM Ratis Server instance. + */ public static SCMRatisServer newSCMRatisServer( Configuration conf, StorageContainerManager scm, SCMNodeDetails scmNodeDetails, List<SCMNodeDetails> peers) @@ -178,7 +186,7 @@ public final class SCMRatisServer { return new SCMStateMachine(this); } - private RaftProperties newRaftProperties(Configuration conf){ + private RaftProperties newRaftProperties(Configuration conf) { final RaftProperties properties = new RaftProperties(); // Set RPC type final String rpcType = conf.get( @@ -403,6 +411,15 @@ public final class SCMRatisServer { } } + public StorageContainerManager getSCM() { + return scm; + } + + @VisibleForTesting + public SCMStateMachine getScmStateMachine() { + return scmStateMachine; + } + public int getServerPort() { return port; } @@ -441,6 +458,10 @@ public final class SCMRatisServer { } } + public TermIndex getLastAppliedTermIndex() { + return scmStateMachine.getLastAppliedTermIndex(); + } + private GroupInfoReply getGroupInfo() throws IOException { GroupInfoRequest groupInfoRequest = new GroupInfoRequest(clientId, raftPeerId, raftGroupId, nextCallId()); diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java new file mode 100644 index 0000000..11b3234 --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMRatisSnapshotInfo.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.hdds.scm.server.ratis; + +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.FileInfo; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.yaml.snakeyaml.DumperOptions; +import org.yaml.snakeyaml.Yaml; + +import java.io.File; +import java.io.FileInputStream; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.io.Writer; +import java.util.List; + +import static org.apache.hadoop.ozone.OzoneConsts.SCM_RATIS_SNAPSHOT_INDEX; + +/** + * This class captures the snapshotIndex and term of the latest snapshot in + * the SCM. + * Ratis server loads the snapshotInfo during startup and updates the + * lastApplied index to this snapshotIndex. OM SnapshotInfo does not contain + * any files. It is used only to store/ update the last applied index and term. + */ +public class SCMRatisSnapshotInfo implements SnapshotInfo { + + static final Logger LOG = LoggerFactory.getLogger(SCMRatisSnapshotInfo.class); + + private volatile long term = 0; + private volatile long snapshotIndex = -1; + + private final File ratisSnapshotFile; + + public SCMRatisSnapshotInfo(File ratisDir) throws IOException { + ratisSnapshotFile = new File(ratisDir, SCM_RATIS_SNAPSHOT_INDEX); + loadRatisSnapshotIndex(); + } + + public void updateTerm(long newTerm) { + term = newTerm; + } + + private void updateTermIndex(long newTerm, long newIndex) { + this.term = newTerm; + this.snapshotIndex = newIndex; + } + + /** + * Load the snapshot index and term from the snapshot file on disk, + * if it exists. + * @throws IOException + */ + private void loadRatisSnapshotIndex() throws IOException { + if (ratisSnapshotFile.exists()) { + RatisSnapshotYaml ratisSnapshotYaml = readRatisSnapshotYaml(); + updateTermIndex(ratisSnapshotYaml.term, ratisSnapshotYaml.snapshotIndex); + } + } + + /** + * Read and parse the snapshot yaml file. + */ + private RatisSnapshotYaml readRatisSnapshotYaml() throws IOException { + try (FileInputStream inputFileStream = new FileInputStream( + ratisSnapshotFile)) { + Yaml yaml = new Yaml(); + try { + return yaml.loadAs(inputFileStream, RatisSnapshotYaml.class); + } catch (Exception e) { + throw new IOException("Unable to parse RatisSnapshot yaml file.", e); + } + } + } + + /** + * Update and persist the snapshot index and term to disk. + * @param lastAppliedTermIndex new snapshot index to be persisted to disk. + * @throws IOException + */ + public void saveRatisSnapshotToDisk(TermIndex lastAppliedTermIndex) + throws IOException { + updateTermIndex(lastAppliedTermIndex.getTerm(), + lastAppliedTermIndex.getIndex()); + writeRatisSnapshotYaml(); + LOG.info("Saved Ratis Snapshot on the SCM with snapshotIndex {}", + lastAppliedTermIndex); + } + + /** + * Write snapshot details to disk in yaml format. + */ + private void writeRatisSnapshotYaml() throws IOException { + DumperOptions options = new DumperOptions(); + options.setPrettyFlow(true); + options.setDefaultFlowStyle(DumperOptions.FlowStyle.FLOW); + Yaml yaml = new Yaml(options); + + RatisSnapshotYaml ratisSnapshotYaml = new RatisSnapshotYaml(term, + snapshotIndex); + + try (Writer writer = new OutputStreamWriter( + new FileOutputStream(ratisSnapshotFile), "UTF-8")) { + yaml.dump(ratisSnapshotYaml, writer); + } + } + + @Override + public TermIndex getTermIndex() { + return TermIndex.newTermIndex(term, snapshotIndex); + } + + @Override + public long getTerm() { + return term; + } + + @Override + public long getIndex() { + return snapshotIndex; + } + + @Override + public List<FileInfo> getFiles() { + return null; + } + + /** + * Ratis Snapshot details to be written to the yaml file. + */ + public static class RatisSnapshotYaml { + private long term; + private long snapshotIndex; + + public RatisSnapshotYaml() { + // Needed for snake-yaml introspection. + } + + RatisSnapshotYaml(long term, long snapshotIndex) { + this.term = term; + this.snapshotIndex = snapshotIndex; + } + + public void setTerm(long term) { + this.term = term; + } + + public long getTerm() { + return this.term; + } + + public void setSnapshotIndex(long index) { + this.snapshotIndex = index; + } + + public long getSnapshotIndex() { + return this.snapshotIndex; + } + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java new file mode 100644 index 0000000..b60570b --- /dev/null +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/SCMStateMachine.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.server.ratis; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.ratis.proto.RaftProtos; +import org.apache.ratis.protocol.Message; +import org.apache.ratis.protocol.RaftClientRequest; +import org.apache.ratis.protocol.RaftGroupId; +import org.apache.ratis.server.RaftServer; +import org.apache.ratis.server.protocol.TermIndex; +import org.apache.ratis.server.storage.RaftStorage; +import org.apache.ratis.statemachine.SnapshotInfo; +import org.apache.ratis.statemachine.TransactionContext; +import org.apache.ratis.statemachine.impl.BaseStateMachine; +import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage; +import org.apache.ratis.util.LifeCycle; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Collection; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; + +/** + * Class for SCM StateMachine. + */ +public class SCMStateMachine extends BaseStateMachine { + static final Logger LOG = + LoggerFactory.getLogger(SCMStateMachine.class); + private final SimpleStateMachineStorage storage = + new SimpleStateMachineStorage(); + private final SCMRatisServer scmRatisServer; + private final StorageContainerManager scm; + private RaftGroupId raftGroupId; + private final SCMRatisSnapshotInfo snapshotInfo; + private final ExecutorService executorService; + private final ExecutorService installSnapshotExecutor; + + // Map which contains index and term for the ratis transactions which are + // stateMachine entries which are recived through applyTransaction. + private ConcurrentMap<Long, Long> applyTransactionMap = + new ConcurrentSkipListMap<>(); + + /** + * Create a SCM state machine. + */ + public SCMStateMachine(SCMRatisServer ratisServer) { + this.scmRatisServer = ratisServer; + this.scm = ratisServer.getSCM(); + + this.snapshotInfo = scm.getSnapshotInfo(); + updateLastAppliedIndexWithSnaphsotIndex(); + + ThreadFactory build = new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("SCM StateMachine ApplyTransaction Thread - %d").build(); + this.executorService = HadoopExecutors.newSingleThreadExecutor(build); + this.installSnapshotExecutor = HadoopExecutors.newSingleThreadExecutor(); + } + + /** + * Initializes the State Machine with the given server, group and storage. + */ + @Override + public void initialize(RaftServer server, RaftGroupId id, + RaftStorage raftStorage) throws IOException { + lifeCycle.startAndTransition(() -> { + super.initialize(server, id, raftStorage); + this.raftGroupId = id; + storage.init(raftStorage); + }); + } + + /** + * Pre-execute the update request into state machine. + */ + @Override + public TransactionContext startTransaction( + RaftClientRequest raftClientRequest) { + return TransactionContext.newBuilder() + .setClientRequest(raftClientRequest) + .setStateMachine(this) + .setServerRole(RaftProtos.RaftPeerRole.LEADER) + .setLogData(raftClientRequest.getMessage().getContent()) + .build(); + } + + /** + * Apply a committed log entry to state machine. + */ + @Override + public CompletableFuture<Message> applyTransaction(TransactionContext trx) { + CompletableFuture<Message> ratisFuture = + new CompletableFuture<>(); + //TODO execute SCMRequest and process SCMResponse + return ratisFuture; + } + + /** + * Query state machine. + */ + @Override + public CompletableFuture<Message> query(Message request) { + //TODO make handler respond to the query request. + return CompletableFuture.completedFuture(request); + } + + /** + * Pause state machine. + */ + @Override + public void pause() { + lifeCycle.transition(LifeCycle.State.PAUSING); + lifeCycle.transition(LifeCycle.State.PAUSED); + } + + /** + * Unpause state machine and update the lastAppliedIndex. + * Following after uploading new state to state machine. + */ + public void unpause(long newLastAppliedSnaphsotIndex, + long newLastAppliedSnapShotTermIndex) { + lifeCycle.startAndTransition(() -> { + this.setLastAppliedTermIndex(TermIndex.newTermIndex( + newLastAppliedSnapShotTermIndex, newLastAppliedSnaphsotIndex)); + }); + } + + /** + * Take SCM snapshot and write index to file. + * @return actual index or 0 if error. + */ + @Override + public long takeSnapshot() throws IOException { + LOG.info("Saving Ratis snapshot on the SCM."); + if (scm != null) { + return scm.saveRatisSnapshot().getIndex(); + } + return 0; + } + + /** + * Get latest SCM snapshot. + */ + @Override + public SnapshotInfo getLatestSnapshot() { + return snapshotInfo; + } + + private synchronized void updateLastApplied() { + Long appliedTerm = null; + long appliedIndex = -1; + for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) { + final Long removed = applyTransactionMap.remove(i); + if (removed == null) { + break; + } + appliedTerm = removed; + appliedIndex = i; + } + if (appliedTerm != null) { + updateLastAppliedTermIndex(appliedTerm, appliedIndex); + } + } + + /** + * Called to notify state machine about indexes which are processed + * internally by Raft Server, this currently happens when conf entries are + * processed in raft Server. This keep state machine to keep a track of index + * updates. + */ + @Override + public void notifyIndexUpdate(long currentTerm, long index) { + applyTransactionMap.put(index, currentTerm); + updateLastApplied(); + snapshotInfo.updateTerm(currentTerm); + } + + /** + * Notifies the state machine that the raft peer is no longer leader. + */ + @Override + public void notifyNotLeader(Collection<TransactionContext> pendingEntries) { + scmRatisServer.updateServerRole(); + } + + /** + * Transfer from log entry to string. + */ + @Override + public String toStateMachineLogEntryString( + RaftProtos.StateMachineLogEntryProto proto) { + //TODO implement transfer from proto to SCMRequest body. + return null; + } + + /** + * Update lastAppliedIndex term in snapshot info. + */ + public void updateLastAppliedIndexWithSnaphsotIndex() { + setLastAppliedTermIndex(TermIndex.newTermIndex(snapshotInfo.getTerm(), + snapshotInfo.getIndex())); + LOG.info("LastAppliedIndex set from SnapShotInfo {}", + getLastAppliedTermIndex()); + } + + @VisibleForTesting + void addApplyTransactionTermIndex(long term, long index) { + applyTransactionMap.put(index, term); + } + + public void stop() { + HadoopExecutors.shutdown(executorService, LOG, 5, TimeUnit.SECONDS); + HadoopExecutors.shutdown(installSnapshotExecutor, LOG, 5, TimeUnit.SECONDS); + } +} diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java similarity index 94% rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java index 4944017..77f4afa 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ratis/package-info.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/ratis/package-info.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.hadoop.hdds.scm.ratis; +package org.apache.hadoop.hdds.scm.server.ratis; /** * This package contains classes related to Apache Ratis for SCM. diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java similarity index 86% rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java index f29fb5f..d6981d3 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ratis/TestSCMRatisServer.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMRatisServer.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.hdds.scm.ratis; +package org.apache.hadoop.hdds.scm.server.ratis; import org.apache.hadoop.hdds.HddsConfigKeys; import org.apache.hadoop.hdds.conf.OzoneConfiguration; @@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys; import org.apache.hadoop.hdds.scm.ha.SCMNodeDetails; import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; import org.apache.hadoop.hdds.scm.server.StorageContainerManager; -import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.test.GenericTestUtils; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.util.LifeCycle; @@ -59,7 +58,6 @@ public class TestSCMRatisServer { private SCMRatisServer scmRatisServer; private StorageContainerManager scm; private String scmId; - private SCMNodeDetails scmNodeDetails; private static final long LEADER_ELECTION_TIMEOUT = 500L; @Before @@ -69,25 +67,14 @@ public class TestSCMRatisServer { conf.setTimeDuration( ScmConfigKeys.OZONE_SCM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); - int ratisPort = conf.getInt( - ScmConfigKeys.OZONE_SCM_RATIS_PORT_KEY, - ScmConfigKeys.OZONE_SCM_RATIS_PORT_DEFAULT); - InetSocketAddress rpcAddress = new InetSocketAddress( - InetAddress.getLocalHost(), 0); - scmNodeDetails = new SCMNodeDetails.Builder() - .setRatisPort(ratisPort) - .setRpcAddress(rpcAddress) - .setSCMNodeId(scmId) - .setSCMServiceId(OzoneConsts.SCM_SERVICE_ID_DEFAULT) - .build(); + conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true); + conf.set(ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID, "scm-ha-test"); // Standalone SCM Ratis server initSCM(); scm = HddsTestUtils.getScm(conf); scm.start(); - scmRatisServer = SCMRatisServer.newSCMRatisServer( - conf, scm, scmNodeDetails, Collections.EMPTY_LIST); - scmRatisServer.start(); + scmRatisServer = scm.getScmRatisServer(); } @After @@ -101,7 +88,7 @@ public class TestSCMRatisServer { } @Test - public void testStartSCMRatisServer() throws Exception { + public void testStartSCMRatisServer() { Assert.assertEquals("Ratis Server should be in running state", LifeCycle.State.RUNNING, scmRatisServer.getServerState()); } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java new file mode 100644 index 0000000..69bc5bd --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/server/ratis/TestSCMStateMachine.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.hadoop.hdds.scm.server.ratis; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.HddsTestUtils; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.server.SCMStorageConfig; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.UUID; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + +/** + * Test class for SCMStateMachine. + */ +public class TestSCMStateMachine { + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + private SCMStateMachine scmStateMachine; + private StorageContainerManager scm; + private SCMRatisServer scmRatisServer; + private OzoneConfiguration conf; + private String scmId; + @Before + public void init() throws Exception { + conf = new OzoneConfiguration(); + conf.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true); + conf.set(ScmConfigKeys.OZONE_SCM_INTERNAL_SERVICE_ID, "scm-ha-test"); + scmId = UUID.randomUUID().toString(); + + initSCM(); + scm = HddsTestUtils.getScm(conf); + scm.start(); + scmRatisServer = scm.getScmRatisServer(); + scmStateMachine = scm.getScmRatisServer().getScmStateMachine(); + } + + @Test + public void testSCMUpdatedAppliedIndex(){ + // State machine should start with 0 term and 0 index. + scmStateMachine.notifyIndexUpdate(0, 0); + Assert.assertEquals(0, + scmStateMachine.getLastAppliedTermIndex().getTerm()); + Assert.assertEquals(0, + scmStateMachine.getLastAppliedTermIndex().getIndex()); + + // If only the transactionMap is updated, index should stay 0. + scmStateMachine.addApplyTransactionTermIndex(0, 1); + Assert.assertEquals(0L, + scmStateMachine.getLastAppliedTermIndex().getTerm()); + Assert.assertEquals(0L, + scmStateMachine.getLastAppliedTermIndex().getIndex()); + + // After the index update is notified, the index should increase. + scmStateMachine.notifyIndexUpdate(0, 1); + Assert.assertEquals(0L, + scmStateMachine.getLastAppliedTermIndex().getTerm()); + Assert.assertEquals(1L, + scmStateMachine.getLastAppliedTermIndex().getIndex()); + + // Only do a notifyIndexUpdate can also increase the index. + scmStateMachine.notifyIndexUpdate(0, 2); + Assert.assertEquals(0L, + scmStateMachine.getLastAppliedTermIndex().getTerm()); + Assert.assertEquals(2L, + scmStateMachine.getLastAppliedTermIndex().getIndex()); + + // If a larger index is notified, the index should not be updated. + scmStateMachine.notifyIndexUpdate(0, 5); + Assert.assertEquals(0L, + scmStateMachine.getLastAppliedTermIndex().getTerm()); + Assert.assertEquals(2L, + scmStateMachine.getLastAppliedTermIndex().getIndex()); + } + + private void initSCM() throws IOException { + String clusterId = UUID.randomUUID().toString(); + final String path = folder.newFolder().toString(); + Path scmPath = Paths.get(path, "scm-meta"); + Files.createDirectories(scmPath); + conf.set(OZONE_METADATA_DIRS, scmPath.toString()); + SCMStorageConfig scmStore = new SCMStorageConfig(conf); + scmStore.setClusterId(clusterId); + scmStore.setScmId(scmId); + // writes the version file properties + scmStore.initialize(); + } + + @After + public void cleanup() { + scm.stop(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org