This is an automated email from the ASF dual-hosted git repository.
qiaojialin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6bd0f2cb0f [IOTDB-2848] Consensus snapshot interface (#5497)
6bd0f2cb0f is described below
commit 6bd0f2cb0fb66f943e46e4f47f8ca8ab11f3e1cf
Author: SzyWilliam <[email protected]>
AuthorDate: Wed Apr 20 12:08:09 2022 +0800
[IOTDB-2848] Consensus snapshot interface (#5497)
---
.../statemachine/PartitionRegionStateMachine.java | 17 ++++++++
.../SnapshotMeta.java} | 41 +++++++++++-------
.../consensus/standalone/StandAloneServerImpl.java | 18 ++++++++
.../consensus/statemachine/EmptyStateMachine.java | 18 ++++++++
.../consensus/statemachine/IStateMachine.java | 49 ++++++++++++++++++++++
.../iotdb/consensus/ratis/RatisConsensusTest.java | 15 +++++++
.../standalone/StandAloneConsensusTest.java | 15 +++++++
.../statemachine/DataRegionStateMachine.java | 17 ++++++++
.../statemachine/SchemaRegionStateMachine.java | 18 ++++++++
9 files changed, 193 insertions(+), 15 deletions(-)
diff --git
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
index 6a97c9953d..38bd8b1305 100644
---
a/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
+++
b/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/PartitionRegionStateMachine.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.confignode.exception.physical.UnknownPhysicalPlanTypeExc
import org.apache.iotdb.confignode.physical.PhysicalPlan;
import org.apache.iotdb.confignode.service.executor.PlanExecutor;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
@@ -32,7 +33,9 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
+import java.nio.ByteBuffer;
/** Statemachine for PartitionRegion */
public class PartitionRegionStateMachine implements IStateMachine {
@@ -95,6 +98,20 @@ public class PartitionRegionStateMachine implements
IStateMachine {
return read(plan);
}
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
+
/** Transmit PhysicalPlan to confignode.service.executor.PlanExecutor */
protected DataSet read(PhysicalPlan plan) {
DataSet result;
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
b/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
similarity index 50%
copy from
consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
copy to
consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
index 48848cdb15..e2fd0188d2 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/common/SnapshotMeta.java
@@ -16,28 +16,39 @@
* specific language governing permissions and limitations
* under the License.
*/
+package org.apache.iotdb.consensus.common;
-package org.apache.iotdb.consensus.statemachine;
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.List;
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.consensus.common.DataSet;
-import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+public class SnapshotMeta {
+ /**
+ * metadata is the IConsensus metadata given when take this snapshot. More
updated snapshot will
+ * have lexicographically larger metadata.
+ */
+ private ByteBuffer metadata;
-public class EmptyStateMachine implements IStateMachine {
+ private List<File> snapshotFiles;
- @Override
- public void start() {}
+ public SnapshotMeta(ByteBuffer metadata, List<File> snapshotFiles) {
+ this.metadata = metadata;
+ this.snapshotFiles = snapshotFiles;
+ }
+
+ public ByteBuffer getMetadata() {
+ return metadata;
+ }
- @Override
- public void stop() {}
+ public void setMetadata(ByteBuffer metadata) {
+ this.metadata = metadata;
+ }
- @Override
- public TSStatus write(IConsensusRequest IConsensusRequest) {
- return new TSStatus(0);
+ public List<File> getSnapshotFiles() {
+ return snapshotFiles;
}
- @Override
- public DataSet read(IConsensusRequest IConsensusRequest) {
- return null;
+ public void setSnapshotFiles(List<File> snapshotFiles) {
+ this.snapshotFiles = snapshotFiles;
}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
index fe76207f48..bb6220a769 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/standalone/StandAloneServerImpl.java
@@ -22,9 +22,13 @@ package org.apache.iotdb.consensus.standalone;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.statemachine.IStateMachine;
+import java.io.File;
+import java.nio.ByteBuffer;
+
public class StandAloneServerImpl implements IStateMachine {
private final Peer peer;
@@ -62,4 +66,18 @@ public class StandAloneServerImpl implements IStateMachine {
public DataSet read(IConsensusRequest request) {
return stateMachine.read(request);
}
+
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
index 48848cdb15..1f66bf1d87 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/EmptyStateMachine.java
@@ -21,8 +21,12 @@ package org.apache.iotdb.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import java.io.File;
+import java.nio.ByteBuffer;
+
public class EmptyStateMachine implements IStateMachine {
@Override
@@ -40,4 +44,18 @@ public class EmptyStateMachine implements IStateMachine {
public DataSet read(IConsensusRequest IConsensusRequest) {
return null;
}
+
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
}
diff --git
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
index e573113a79..5279e235a4 100644
---
a/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
+++
b/consensus/src/main/java/org/apache/iotdb/consensus/statemachine/IStateMachine.java
@@ -22,8 +22,11 @@ package org.apache.iotdb.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
+import java.io.File;
+import java.nio.ByteBuffer;
import java.util.function.Function;
public interface IStateMachine {
@@ -34,7 +37,53 @@ public interface IStateMachine {
void stop();
+ /**
+ * apply a write-request from user
+ *
+ * @param IConsensusRequest write request
+ */
TSStatus write(IConsensusRequest IConsensusRequest);
+ /**
+ * read local data and return
+ *
+ * @param IConsensusRequest read request
+ */
DataSet read(IConsensusRequest IConsensusRequest);
+
+ /**
+ * IConsensus will periodically take the snapshot on both log and
statemachine Data
+ *
+ * @param metadata the metadata IConsensus want IStateMachine to preserve.
NOTICE: the more
+ * updated snapshot will have lexicographically larger metadata. This
property should be
+ * guaranteed by every IConsensus implementation. IStateMachine can use
the metadata to sort
+ * or label snapshot.
+ * @param snapshotDir the root dir of snapshot files
+ */
+ void takeSnapshot(ByteBuffer metadata, File snapshotDir);
+
+ /**
+ * When recover from crash / leader installSnapshot to follower, this method
is called.
+ * IStateMachine is required to find the latest snapshot in snapshotDir.
+ *
+ * @param snapshotDir the root dir of snapshot files
+ * @return latest snapshot info (metadata + snapshot files)
+ */
+ SnapshotMeta getLatestSnapshot(File snapshotDir);
+
+ /**
+ * When recover from crash / follower installSnapshot from leader, this
method is called.
+ * IStateMachine is required to load the given snapshot.
+ *
+ * @param latest is the latest snapshot given
+ */
+ void loadSnapshot(SnapshotMeta latest);
+
+ /**
+ * IConsensus will periodically clean up old snapshots. This method is
called to inform
+ * IStateMachine to remove out-dated snapshot.
+ *
+ * @param snapshotDir the root dir of snapshot files
+ */
+ void cleanUpOldSnapshots(File snapshotDir);
}
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
index f5f29470f4..f0e5093f0a 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/ratis/RatisConsensusTest.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.ConsensusGroup;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusReadResponse;
@@ -106,6 +107,20 @@ public class RatisConsensusTest {
dataSet.setNumber(integer.get());
return dataSet;
}
+
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
}
private ConsensusGroupId gid;
diff --git
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
index 2478620c27..b7eb47d975 100644
---
a/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
+++
b/consensus/src/test/java/org/apache/iotdb/consensus/standalone/StandAloneConsensusTest.java
@@ -29,6 +29,7 @@ import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.consensus.IConsensus;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest;
import org.apache.iotdb.consensus.common.request.IConsensusRequest;
import org.apache.iotdb.consensus.common.response.ConsensusGenericResponse;
@@ -109,6 +110,20 @@ public class StandAloneConsensusTest {
public DataSet read(IConsensusRequest request) {
return null;
}
+
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
}
@Before
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
index 28ac5bf5d8..0d99a2d6c2 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/DataRegionStateMachine.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.db.engine.storagegroup.DataRegion;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
@@ -37,6 +38,8 @@ import org.apache.iotdb.rpc.RpcUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.ByteBuffer;
import java.util.Arrays;
public class DataRegionStateMachine extends BaseStateMachine {
@@ -58,6 +61,20 @@ public class DataRegionStateMachine extends BaseStateMachine
{
@Override
public void stop() {}
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
+
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
PlanNode insertNode = fragmentInstance.getFragment().getRoot();
diff --git
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
index f964522d29..d18a1276b7 100644
---
a/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
+++
b/server/src/main/java/org/apache/iotdb/db/consensus/statemachine/SchemaRegionStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.consensus.statemachine;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.consensus.common.DataSet;
+import org.apache.iotdb.consensus.common.SnapshotMeta;
import org.apache.iotdb.db.metadata.Executor.SchemaVisitor;
import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
import org.apache.iotdb.db.mpp.execution.FragmentInstanceManager;
@@ -30,6 +31,9 @@ import org.apache.iotdb.db.mpp.sql.planner.plan.node.PlanNode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
+import java.nio.ByteBuffer;
+
public class SchemaRegionStateMachine extends BaseStateMachine {
private static final Logger logger =
LoggerFactory.getLogger(SchemaRegionStateMachine.class);
@@ -48,6 +52,20 @@ public class SchemaRegionStateMachine extends
BaseStateMachine {
@Override
public void stop() {}
+ @Override
+ public void takeSnapshot(ByteBuffer metadata, File snapshotDir) {}
+
+ @Override
+ public SnapshotMeta getLatestSnapshot(File snapshotDir) {
+ return null;
+ }
+
+ @Override
+ public void loadSnapshot(SnapshotMeta latest) {}
+
+ @Override
+ public void cleanUpOldSnapshots(File snapshotDir) {}
+
@Override
protected TSStatus write(FragmentInstance fragmentInstance) {
logger.info("Execute write plan in SchemaRegionStateMachine");