This is an automated email from the ASF dual-hosted git repository.

qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bd0f2cb0f [IOTDB-2848] Consensus snapshot interface (#5497)
6bd0f2cb0f is described below

commit 6bd0f2cb0fb66f943e46e4f47f8ca8ab11f3e1cf
Author: SzyWilliam <[email protected]>
AuthorDate: Wed Apr 20 12:08:09 2022 +0800

    [IOTDB-2848] Consensus snapshot interface (#5497)
---
 .../statemachine/PartitionRegionStateMachine.java  | 17 ++++++++
 .../SnapshotMeta.java}                             | 41 +++++++++++-------
 .../consensus/standalone/StandAloneServerImpl.java | 18 ++++++++
 .../consensus/statemachine/EmptyStateMachine.java  | 18 ++++++++
 .../consensus/statemachine/IStateMachine.java      | 49 ++++++++++++++++++++++
 .../iotdb/consensus/ratis/RatisConsensusTest.java  | 15 +++++++
 .../standalone/StandAloneConsensusTest.java        | 15 +++++++
 .../statemachine/DataRegionStateMachine.java       | 17 ++++++++
 .../statemachine/SchemaRegionStateMachine.java     | 18 ++++++++
 9 files changed, 193 insertions(+), 15 deletions(-)

diff --git 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 6a97c9953d..38bd8b1305 100644
--- 
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++ 
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
 import org.apache.iotdb.confignode.physical.PhysicalPlan;
 import org.apache.iotdb.confignode.service.executor.PlanExecutor;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
@@ -32,7 +33,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /** Statemachine for PartitionRegion */
 public class PartitionRegionStateMachine implements IStateMachine {
@@ -95,6 +98,20 @@ public class PartitionRegionStateMachine implements 
IStateMachine {
     return read(plan);
   }
 
+  @Override
+  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+  @Override
+  public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+    return null;
+  }
+
+  @Override
+  public void loadSnapshot(SnapshotMeta latest) {}
+
+  @Override
+  public void cleanUpOldSnapshots(File snapshotDir) {}
+
   /** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
   protected DataSet read(PhysicalPlan plan) {
     DataSet result;
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
 b/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
similarity index 50%
copy from 
consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
copy to 
consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
index 48848cdb15..e2fd0188d2 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
@@ -16,28 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+package org.apache.iotdb.consensus.common;
 
-package org.apache.iotdb.consensus.statemachine;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
 
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+public class SnapshotMeta {
+  /**
+   * metadata is the IConsensus metadata given when take this snapshot. More 
updated snapshot will
+   * have lexicographically larger metadata.
+   */
+  private ByteBuffer metadata;
 
-public class EmptyStateMachine implements IStateMachine {
+  private List<File> snapshotFiles;
 
-  @Override
-  public void start() {}
+  public SnapshotMeta(ByteBuffer metadata, List<File> snapshotFiles) {
+    this.metadata = metadata;
+    this.snapshotFiles = snapshotFiles;
+  }
+
+  public ByteBuffer getMetadata() {
+    return metadata;
+  }
 
-  @Override
-  public void stop() {}
+  public void setMetadata(ByteBuffer metadata) {
+    this.metadata = metadata;
+  }
 
-  @Override
-  public TSStatus write(IConsensusRequest IConsensusRequest) {
-    return new TSStatus(0);
+  public List<File> getSnapshotFiles() {
+    return snapshotFiles;
   }
 
-  @Override
-  public DataSet read(IConsensusRequest IConsensusRequest) {
-    return null;
+  public void setSnapshotFiles(List<File> snapshotFiles) {
+    this.snapshotFiles = snapshotFiles;
   }
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index fe76207f48..bb6220a769 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -22,9 +22,13 @@ package org.apache.iotdb.consensus.standalone;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.statemachine.IStateMachine;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+
 public class StandAloneServerImpl implements IStateMachine {
 
   private final Peer peer;
@@ -62,4 +66,18 @@ public class StandAloneServerImpl implements IStateMachine {
   public DataSet read(IConsensusRequest request) {
     return stateMachine.read(request);
   }
+
+  @Override
+  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+  @Override
+  public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+    return null;
+  }
+
+  @Override
+  public void loadSnapshot(SnapshotMeta latest) {}
+
+  @Override
+  public void cleanUpOldSnapshots(File snapshotDir) {}
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
index 48848cdb15..1f66bf1d87 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -21,8 +21,12 @@ package org.apache.iotdb.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+
 public class EmptyStateMachine implements IStateMachine {
 
   @Override
@@ -40,4 +44,18 @@ public class EmptyStateMachine implements IStateMachine {
   public DataSet read(IConsensusRequest IConsensusRequest) {
     return null;
   }
+
+  @Override
+  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+  @Override
+  public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+    return null;
+  }
+
+  @Override
+  public void loadSnapshot(SnapshotMeta latest) {}
+
+  @Override
+  public void cleanUpOldSnapshots(File snapshotDir) {}
 }
diff --git 
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
 
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
index e573113a79..5279e235a4 100644
--- 
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++ 
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.consensus.statemachine;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.consensus.ConsensusGroupId;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 
+import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.function.Function;
 
 public interface IStateMachine {
@@ -34,7 +37,53 @@ public interface IStateMachine {
 
   void stop();
 
+  /**
+   * apply a write-request from user
+   *
+   * @param IConsensusRequest write request
+   */
   TSStatus write(IConsensusRequest IConsensusRequest);
 
+  /**
+   * read local data and return
+   *
+   * @param IConsensusRequest read request
+   */
   DataSet read(IConsensusRequest IConsensusRequest);
+
+  /**
+   * IConsensus will periodically take the snapshot on both log and 
statemachine Data
+   *
+   * @param metadata the metadata IConsensus want IStateMachine to preserve. 
NOTICE: the more
+   *     updated snapshot will have lexicographically larger metadata. This 
property should be
+   *     guaranteed by every IConsensus implementation. IStateMachine can use 
the metadata to sort
+   *     or label snapshot.
+   * @param snapshotDir the root dir of snapshot files
+   */
+  void takeSnapshot(ByteBuffer metadata, File snapshotDir);
+
+  /**
+   * When recover from crash / leader installSnapshot to follower, this method 
is called.
+   * IStateMachine is required to find the latest snapshot in snapshotDir.
+   *
+   * @param snapshotDir the root dir of snapshot files
+   * @return latest snapshot info (metadata + snapshot files)
+   */
+  SnapshotMeta getLatestSnapshot(File snapshotDir);
+
+  /**
+   * When recover from crash / follower installSnapshot from leader, this 
method is called.
+   * IStateMachine is required to load the given snapshot.
+   *
+   * @param latest is the latest snapshot given
+   */
+  void loadSnapshot(SnapshotMeta latest);
+
+  /**
+   * IConsensus will periodically clean up old snapshots. This method is 
called to inform
+   * IStateMachine to remove out-dated snapshot.
+   *
+   * @param snapshotDir the root dir of snapshot files
+   */
+  void cleanUpOldSnapshots(File snapshotDir);
 }
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index f5f29470f4..f0e5093f0a 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.ConsensusGroup;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -106,6 +107,20 @@ public class RatisConsensusTest {
       dataSet.setNumber(integer.get());
       return dataSet;
     }
+
+    @Override
+    public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+    @Override
+    public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+      return null;
+    }
+
+    @Override
+    public void loadSnapshot(SnapshotMeta latest) {}
+
+    @Override
+    public void cleanUpOldSnapshots(File snapshotDir) {}
   }
 
   private ConsensusGroupId gid;
diff --git 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 2478620c27..b7eb47d975 100644
--- 
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++ 
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
 import org.apache.iotdb.consensus.IConsensus;
 import org.apache.iotdb.consensus.common.DataSet;
 import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
 import org.apache.iotdb.consensus.common.request.IConsensusRequest;
 import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -109,6 +110,20 @@ public class StandAloneConsensusTest {
     public DataSet read(IConsensusRequest request) {
       return null;
     }
+
+    @Override
+    public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+    @Override
+    public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+      return null;
+    }
+
+    @Override
+    public void loadSnapshot(SnapshotMeta latest) {}
+
+    @Override
+    public void cleanUpOldSnapshots(File snapshotDir) {}
   }
 
   @Before
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 28ac5bf5d8..0d99a2d6c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.consensus.statemachine;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.db.engine.storagegroup.DataRegion;
 import org.apache.iotdb.db.exception.BatchProcessException;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
@@ -37,6 +38,8 @@ import org.apache.iotdb.rpc.RpcUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 
 public class DataRegionStateMachine extends BaseStateMachine {
@@ -58,6 +61,20 @@ public class DataRegionStateMachine extends BaseStateMachine 
{
   @Override
   public void stop() {}
 
+  @Override
+  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+  @Override
+  public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+    return null;
+  }
+
+  @Override
+  public void loadSnapshot(SnapshotMeta latest) {}
+
+  @Override
+  public void cleanUpOldSnapshots(File snapshotDir) {}
+
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
     PlanNode insertNode = fragmentInstance.getFragment().getRoot();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index f964522d29..d18a1276b7 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
 import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
@@ -30,6 +31,9 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
+import java.nio.ByteBuffer;
+
 public class SchemaRegionStateMachine extends BaseStateMachine {
 
   private static final Logger logger = 
LoggerFactory.getLogger(SchemaRegionStateMachine.class);
@@ -48,6 +52,20 @@ public class SchemaRegionStateMachine extends 
BaseStateMachine {
   @Override
   public void stop() {}
 
+  @Override
+  public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+  @Override
+  public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+    return null;
+  }
+
+  @Override
+  public void loadSnapshot(SnapshotMeta latest) {}
+
+  @Override
+  public void cleanUpOldSnapshots(File snapshotDir) {}
+
   @Override
   protected TSStatus write(FragmentInstance fragmentInstance) {
     logger.info("Execute write plan in SchemaRegionStateMachine");

Reply via email to