This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 18fc93be959 Refactor Procedure recover framework (#12202)
18fc93be959 is described below
commit 18fc93be959656d29dd76a5caf918e039e3dae32
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Mar 22 10:45:14 2024 +0800
Refactor Procedure recover framework (#12202)
---
.../confignode/it/procedure/IoTDBProcedureIT.java | 4 +-
.../consensus/request/ConfigPhysicalPlanType.java | 3 +
.../write/procedure/UpdateProcedurePlan.java | 9 +-
.../statemachine/ConfigRegionStateMachine.java | 6 +-
.../iotdb/confignode/manager/ConfigManager.java | 4 +-
.../iotdb/confignode/manager/ProcedureManager.java | 36 ++--
.../manager/consensus/ConsensusManager.java | 6 +
.../confignode/persistence/ProcedureInfo.java | 235 ++++++++++++++++++---
.../persistence/executor/ConfigPlanExecutor.java | 3 +
.../procedure/CompletedProcedureRecycler.java | 6 +-
.../iotdb/confignode/procedure/Procedure.java | 14 +-
.../confignode/procedure/ProcedureExecutor.java | 108 ++++------
.../procedure/TimeoutExecutorThread.java | 2 +-
.../AddNeverFinishSubProcedureProcedure.java | 6 +
.../impl/testonly/NeverFinishProcedure.java | 17 ++
.../procedure/store/ConfigProcedureStore.java | 44 +++-
.../procedure/store/IProcedureStore.java | 17 +-
.../confignode/procedure/store/ProcedureType.java | 4 +-
.../confignode/procedure/store/ProcedureWAL.java | 46 +---
.../consensus/request/TestOnlyPlan.java} | 39 ++--
.../confignode/persistence/ProcedureInfoTest.java | 75 +++++++
.../confignode/procedure/NoopProcedureStore.java | 31 ++-
.../UpgradeFromWALToConsensusLayerTest.java | 137 ++++++++++++
.../procedure/util/ProcedureTestUtil.java | 11 -
.../iotdb/consensus/ratis/RatisConsensus.java | 3 +-
25 files changed, 630 insertions(+), 236 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
index 37cb4efd162..6b92ca8c64f 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -126,7 +126,7 @@ public class IoTDBProcedureIT {
Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE);
// Then shutdown the leader, wait the new leader exist and the procedure
continue
final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex();
- EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stop();
+ EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stopForcibly();
if (needRestartLeader) {
EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).start();
}
@@ -179,7 +179,7 @@ public class IoTDBProcedureIT {
// Restart the ConfigNode
final int leaderConfigNodeIndex =
EnvFactory.getEnv().getLeaderConfigNodeIndex();
-
EnvFactory.getEnv().getConfigNodeWrapperList().get(leaderConfigNodeIndex).stop();
+
EnvFactory.getEnv().getConfigNodeWrapperList().get(leaderConfigNodeIndex).stopForcibly();
EnvFactory.getEnv().getConfigNodeWrapperList().get(leaderConfigNodeIndex).start();
SyncConfigNodeIServiceClient newLeaderClient =
(SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
index b8d916189a2..cc963606c61 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/ConfigPhysicalPlanType.java
@@ -219,6 +219,9 @@ public enum ConfigPhysicalPlanType {
AlterConsumerGroup((short) 1900),
ShowSubscription((short) 2000),
+
+ /** Test Only. */
+ TestOnly((short) 30000),
;
private final short planType;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/procedure/UpdateProcedurePlan.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/procedure/UpdateProcedurePlan.java
index 15d3d22d06a..45cea9af5eb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/procedure/UpdateProcedurePlan.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/request/write/procedure/UpdateProcedurePlan.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.confignode.consensus.request.write.procedure;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlan;
import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import java.io.DataOutputStream;
@@ -31,13 +32,13 @@ import java.util.Objects;
public class UpdateProcedurePlan extends ConfigPhysicalPlan {
- private Procedure procedure;
+ private Procedure<ConfigNodeProcedureEnv> procedure;
- public Procedure getProcedure() {
+ public Procedure<ConfigNodeProcedureEnv> getProcedure() {
return procedure;
}
- public void setProcedure(Procedure procedure) {
+ public void setProcedure(Procedure<ConfigNodeProcedureEnv> procedure) {
this.procedure = procedure;
}
@@ -45,7 +46,7 @@ public class UpdateProcedurePlan extends ConfigPhysicalPlan {
super(ConfigPhysicalPlanType.UpdateProcedure);
}
- public UpdateProcedurePlan(Procedure procedure) {
+ public UpdateProcedurePlan(Procedure<ConfigNodeProcedureEnv> procedure) {
this();
this.procedure = procedure;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
index e8793dcf8fb..375628100e9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/consensus/statemachine/ConfigRegionStateMachine.java
@@ -222,7 +222,7 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeMetaSync();
configManager.getPipeManager().getPipeRuntimeCoordinator().stopPipeHeartbeat();
configManager.getLoadManager().stopLoadServices();
- configManager.getProcedureManager().shiftExecutor(false);
+ configManager.getProcedureManager().stopExecutor();
configManager.getRetryFailedTasksThread().stopRetryFailedTasksService();
configManager.getPartitionManager().stopRegionCleaner();
configManager.getCQManager().stopCQScheduler();
@@ -246,7 +246,9 @@ public class ConfigRegionStateMachine implements
IStateMachine, IStateMachine.Ev
configManager.getLoadManager().startLoadServices();
// Start leader scheduling services
- configManager.getProcedureManager().shiftExecutor(true);
+ configManager.getProcedureManager().startExecutor();
+ threadPool.submit(
+ () ->
configManager.getProcedureManager().getStore().getProcedureInfo().upgrade());
configManager.getRetryFailedTasksThread().startRetryFailedTasksService();
configManager.getPartitionManager().startRegionCleaner();
configManager.checkUserPathPrivilege();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 5af1da5437a..812b232340b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -284,7 +284,7 @@ public class ConfigManager implements IManager {
ClusterSchemaInfo clusterSchemaInfo = new ClusterSchemaInfo();
PartitionInfo partitionInfo = new PartitionInfo();
AuthorInfo authorInfo = new AuthorInfo();
- ProcedureInfo procedureInfo = new ProcedureInfo();
+ ProcedureInfo procedureInfo = new ProcedureInfo(this);
UDFInfo udfInfo = new UDFInfo();
TriggerInfo triggerInfo = new TriggerInfo();
CQInfo cqInfo = new CQInfo();
@@ -350,7 +350,7 @@ public class ConfigManager implements IManager {
partitionManager.getRegionMaintainer().shutdown();
}
if (procedureManager != null) {
- procedureManager.shiftExecutor(false);
+ procedureManager.stopExecutor();
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 55cc36ddbb4..e5a6ce7ec35 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -161,25 +161,25 @@ public class ProcedureManager {
this.procedureMetrics = new ProcedureMetrics(this);
}
- public void shiftExecutor(boolean running) {
- if (running) {
+ public void startExecutor() {
+ if (!executor.isRunning()) {
+ executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount());
+ executor.startWorkers();
+ executor.startCompletedCleaner(
+ CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
+ CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
+ store.start();
+ LOGGER.info("ProcedureManager is started successfully.");
+ }
+ }
+
+ public void stopExecutor() {
+ if (executor.isRunning()) {
+ executor.stop();
if (!executor.isRunning()) {
- executor.init(CONFIG_NODE_CONFIG.getProcedureCoreWorkerThreadsCount());
- executor.startWorkers();
- executor.startCompletedCleaner(
- CONFIG_NODE_CONFIG.getProcedureCompletedCleanInterval(),
- CONFIG_NODE_CONFIG.getProcedureCompletedEvictTTL());
- store.start();
- LOGGER.info("ProcedureManager is started successfully.");
- }
- } else {
- if (executor.isRunning()) {
- executor.stop();
- if (!executor.isRunning()) {
- executor.join();
- store.stop();
- LOGGER.info("ProcedureManager is stopped successfully.");
- }
+ executor.join();
+ store.stop();
+ LOGGER.info("ProcedureManager is stopped successfully.");
}
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index 3d78a858205..8fcea7da15f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -42,6 +42,7 @@ import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.config.ConsensusConfig;
import org.apache.iotdb.consensus.config.RatisConfig;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.ratis.util.SizeInBytes;
@@ -144,6 +145,7 @@ public class ConsensusManager {
RatisConfig.Snapshot.newBuilder()
.setAutoTriggerThreshold(
CONF.getConfigNodeRatisSnapshotTriggerThreshold())
+ .setCreationGap(1)
.build())
.setLog(
RatisConfig.Log.newBuilder()
@@ -432,4 +434,8 @@ public class ConsensusManager {
private NodeManager getNodeManager() {
return configManager.getNodeManager();
}
+
+ public void manuallyTakeSnapshot() throws ConsensusException {
+ consensusImpl.triggerSnapshot(ConfigNodeInfo.CONFIG_REGION_ID);
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index fe181ef382e..9a6666eee68 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -21,79 +21,260 @@ package org.apache.iotdb.confignode.persistence;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
- private static final Logger LOG =
LoggerFactory.getLogger(ProcedureInfo.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProcedureInfo.class);
+ private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+ private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+ private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+ private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+ private final String OLD_PROCEDURE_WAL_DIR =
+ CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+ private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+ new ConcurrentHashMap<>();
+
+ private long lastProcId = -1;
private final ProcedureFactory procedureFactory =
ProcedureFactory.getInstance();
- private final String procedureWalDir =
- CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
- private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new
ConcurrentHashMap<>();
- public void load(List<Procedure> procedureList) {
- try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
+ private final ConfigManager configManager;
+
+ public ProcedureInfo(ConfigManager configManager) {
+ this.configManager = configManager;
+ }
+
+ public boolean isOldVersion() {
+ return new File(OLD_PROCEDURE_WAL_DIR).exists();
+ }
+
+ public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
+ List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
+ try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
s.filter(path ->
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
.sorted(
(p1, p2) ->
Long.compareUnsigned(
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
- .forEach(
- path -> {
- String fileName = path.getFileName().toString();
- long procId = Long.parseLong(fileName.split("\\.")[0]);
- ProcedureWAL procedureWAL =
- procWALMap.computeIfAbsent(
- procId, id -> new ProcedureWAL(path,
procedureFactory));
- procedureWAL.load(procedureList);
- });
+ .forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
} catch (IOException e) {
- LOG.error("Load procedure wal failed.", e);
+ LOGGER.error("Load procedure wal failed.", e);
+ }
+ procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(),
procedure));
+ procedureList.forEach(procedure -> lastProcId = Math.max(lastProcId,
procedure.getProcId()));
+ return procedureList;
+ }
+
+ public void upgrade() {
+ if (isOldVersion()) {
+ try {
+ LOGGER.info("Old procedure files have been loaded successfully, taking
snapshot...");
+ configManager.getConsensusManager().manuallyTakeSnapshot();
+ } catch (ConsensusException e) {
+ LOGGER.warn("Taking snapshot fail, procedure upgrade fail", e);
+ return;
+ }
+ try {
+ FileUtils.recursiveDeleteFolder(OLD_PROCEDURE_WAL_DIR);
+ } catch (IOException e) {
+ LOGGER.error("Delete useless procedure wal dir fail.", e);
+ LOGGER.error(
+ "You should manually delete the procedure wal dir before
ConfigNode restart. {}",
+ OLD_PROCEDURE_WAL_DIR);
+ }
+ LOGGER.info(
+ "The Procedure framework has been successfully upgraded. Now it uses
the consensus layer's services instead of maintaining the WAL itself.");
}
}
public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
+ Procedure<ConfigNodeProcedureEnv> procedure =
updateProcedurePlan.getProcedure();
+ procedureMap.put(procedure.getProcId(), procedure);
+ lastProcId = Math.max(lastProcId, procedure.getProcId());
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ @TestOnly
+ public TSStatus oldUpdateProcedure(UpdateProcedurePlan updateProcedurePlan) {
Procedure procedure = updateProcedurePlan.getProcedure();
long procId = procedure.getProcId();
- Path path = Paths.get(procedureWalDir, procId + PROCEDURE_WAL_SUFFIX);
- ProcedureWAL procedureWAL =
- procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path,
procedureFactory));
+ Path path = Paths.get(OLD_PROCEDURE_WAL_DIR, procId +
PROCEDURE_WAL_SUFFIX);
+ ProcedureWAL procedureWAL = new ProcedureWAL(path, procedureFactory);
try {
procedureWAL.save(procedure);
} catch (IOException e) {
- LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId(),
e);
+ LOGGER.error("Update Procedure (pid={}) wal failed",
procedure.getProcId(), e);
return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
}
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
public TSStatus deleteProcedure(DeleteProcedurePlan deleteProcedurePlan) {
- long procId = deleteProcedurePlan.getProcId();
- ProcedureWAL procedureWAL = procWALMap.get(procId);
- if (procedureWAL != null) {
- procedureWAL.delete();
- }
- procWALMap.remove(procId);
+ procedureMap.remove(deleteProcedurePlan.getProcId());
return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
}
+
+ private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
+ try (FileInputStream fis = new
FileInputStream(procedureFilePath.toFile())) {
+ Procedure procedure = null;
+ try (FileChannel channel = fis.getChannel()) {
+ ByteBuffer byteBuffer =
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
+ if (channel.read(byteBuffer) > 0) {
+ byteBuffer.flip();
+ procedure = ProcedureFactory.getInstance().create(byteBuffer);
+ byteBuffer.clear();
+ }
+ return Optional.ofNullable(procedure);
+ }
+ } catch (IOException e) {
+ LOGGER.error("Load {} failed, it will be deleted.", procedureFilePath,
e);
+ if (!procedureFilePath.toFile().delete()) {
+ LOGGER.error("{} deleted failed; take appropriate action.",
procedureFilePath, e);
+ }
+ }
+ return Optional.empty();
+ }
+
+ @Override
+ public boolean processTakeSnapshot(File snapshotDir) throws TException,
IOException {
+ File procedureSnapshotDir = new File(snapshotDir, PROCEDURE_SNAPSHOT_DIR);
+ if (procedureSnapshotDir.exists()) {
+ LOGGER.error(
+ "Failed to take snapshot, because snapshot dir [{}] is already
exist.",
+ procedureSnapshotDir.getAbsolutePath());
+ return false;
+ }
+ File tmpDir = new File(procedureSnapshotDir.getAbsolutePath() + "-" +
UUID.randomUUID());
+ if (!tmpDir.mkdir()) {
+ LOGGER.error("Failed to take snapshot, because create tmp dir [{}]
fail.", tmpDir);
+ return false;
+ }
+
+ // save lastProcId
+ File mainFile = new File(tmpDir.getAbsolutePath() + File.separator +
MAIN_SNAPSHOT_FILENAME);
+ try (FileOutputStream fileOutputStream = new FileOutputStream(mainFile);
+ DataOutputStream dataOutputStream = new
DataOutputStream(fileOutputStream);
+ TIOStreamTransport tioStreamTransport = new
TIOStreamTransport(fileOutputStream)) {
+ ReadWriteIOUtils.write(lastProcId, fileOutputStream);
+ tioStreamTransport.flush();
+ fileOutputStream.getFD().sync();
+ }
+
+ // save all procedures
+ AtomicBoolean snapshotAllSuccess = new AtomicBoolean(true);
+ procedureMap
+ .values()
+ .forEach(
+ procedure -> {
+ try {
+ new ProcedureWAL(
+ Paths.get(
+ tmpDir.getAbsolutePath()
+ + File.separator
+ + procedure.getProcId()
+ + PROCEDURE_SNAPSHOT_FILE_SUFFIX),
+ procedureFactory)
+ .save(procedure);
+ } catch (IOException e) {
+ snapshotAllSuccess.set(false);
+ LOGGER.warn(
+ "{} id {} took snapshot fail", procedure.getClass(),
procedure.getProcId(), e);
+ }
+ });
+ if (!snapshotAllSuccess.get()) {
+ return false;
+ }
+
+ return tmpDir.renameTo(procedureSnapshotDir);
+ }
+
+ @Override
+ public void processLoadSnapshot(File snapshotDir) throws TException,
IOException {
+ File procedureSnapshotDir = new File(snapshotDir, PROCEDURE_SNAPSHOT_DIR);
+ if (!procedureSnapshotDir.exists() || !procedureSnapshotDir.isDirectory())
{
+ LOGGER.error(
+ "Failed to load snapshot, because snapshot dir [{}] not exists.",
+ procedureSnapshotDir.getAbsolutePath());
+ return;
+ }
+
+ File mainFile =
+ new File(procedureSnapshotDir.getAbsolutePath() + File.separator +
MAIN_SNAPSHOT_FILENAME);
+ try (FileInputStream fileInputStream = new FileInputStream(mainFile)) {
+ lastProcId = ReadWriteIOUtils.readLong(fileInputStream);
+ }
+
+ Arrays.stream(Objects.requireNonNull(procedureSnapshotDir.listFiles()))
+ .forEach(
+ procedureSnapshotFile -> {
+ if
(!procedureSnapshotFile.getName().equals(MAIN_SNAPSHOT_FILENAME)) {
+ loadProcedure(procedureSnapshotFile.toPath())
+ .ifPresent(procedure ->
procedureMap.put(procedure.getProcId(), procedure));
+ }
+ });
+ }
+
+ public List<Procedure<ConfigNodeProcedureEnv>> getProcedures() {
+ return new ArrayList<>(procedureMap.values());
+ }
+
+ public long getNextProcId() {
+ return ++this.lastProcId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ProcedureInfo procedureInfo = (ProcedureInfo) o;
+ return lastProcId == procedureInfo.lastProcId
+ && procedureMap.equals(procedureInfo.procedureMap);
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
index 7c16ee0104c..14f81103771 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/executor/ConfigPlanExecutor.java
@@ -220,6 +220,7 @@ public class ConfigPlanExecutor {
this.snapshotProcessorList.add(subscriptionInfo);
this.procedureInfo = procedureInfo;
+ this.snapshotProcessorList.add(procedureInfo);
this.quotaInfo = quotaInfo;
this.snapshotProcessorList.add(quotaInfo);
@@ -489,6 +490,8 @@ public class ConfigPlanExecutor {
// PipeUnsetTemplate plan will not be written here, and exists only
after pipe sender
// collects UnsetTemplatePlan and before receiver calls ConfigManager.
throw new UnsupportedOperationException("PipeUnsetTemplate is not
supported.");
+ case TestOnly:
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
default:
throw new UnknownPhysicalPlanTypeException(physicalPlan.getType());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
index 46d1b47dbc0..ccf3afeb4e6 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java
@@ -32,12 +32,12 @@ import java.util.concurrent.TimeUnit;
public class CompletedProcedureRecycler<Env> extends InternalProcedure<Env> {
private static final Logger LOG =
LoggerFactory.getLogger(CompletedProcedureRecycler.class);
private static final int DEFAULT_BATCH_SIZE = 32;
- private long evictTTL;
+ private final long evictTTL;
private final Map<Long, CompletedProcedureContainer<Env>> completed;
- private final IProcedureStore store;
+ private final IProcedureStore<Env> store;
public CompletedProcedureRecycler(
- IProcedureStore store,
+ IProcedureStore<Env> store,
Map<Long, CompletedProcedureContainer<Env>> completedMap,
long cleanTimeInterval,
long evictTTL) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 53bdc7774a3..1008fe62dfd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -40,7 +40,6 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -571,7 +570,7 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
}
/** Called by the ProcedureExecutor to assign the ID to the newly created
procedure. */
- protected void setProcId(long procId) {
+ public void setProcId(long procId) {
this.procId = procId;
}
@@ -882,17 +881,6 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
return stackIndexes;
}
- /** Helper to lookup the root Procedure ID given a specified procedure. */
- protected static long getRootProcedureId(Map<Long, Procedure> procedures,
Procedure proc) {
- while (proc.hasParent()) {
- proc = procedures.get(proc.getParentProcId());
- if (proc == null) {
- return NO_PROC_ID;
- }
- }
- return proc.getProcId();
- }
-
public void setRootProcedureId(long rootProcedureId) {
this.rootProcId = rootProcedureId;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index 30284b478f5..0cd0b2ee422 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -50,6 +50,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import static org.apache.iotdb.confignode.procedure.Procedure.NO_PROC_ID;
+
public class ProcedureExecutor<Env> {
private static final Logger LOG =
LoggerFactory.getLogger(ProcedureExecutor.class);
@@ -59,7 +61,7 @@ public class ProcedureExecutor<Env> {
private final ConcurrentHashMap<Long, RootProcedureStack<Env>> rollbackStack
=
new ConcurrentHashMap<>();
- private final ConcurrentHashMap<Long, Procedure> procedures = new
ConcurrentHashMap<>();
+ private final ConcurrentHashMap<Long, Procedure<Env>> procedures = new
ConcurrentHashMap<>();
private ThreadGroup threadGroup;
@@ -74,23 +76,21 @@ public class ProcedureExecutor<Env> {
private final ProcedureScheduler scheduler;
- private final AtomicLong lastProcId = new AtomicLong(-1);
private final AtomicLong workId = new AtomicLong(0);
private final AtomicInteger activeExecutorCount = new AtomicInteger(0);
private final AtomicBoolean running = new AtomicBoolean(false);
private final Env environment;
- private final IProcedureStore store;
+ private final IProcedureStore<Env> store;
public ProcedureExecutor(
- final Env environment, final IProcedureStore store, final
ProcedureScheduler scheduler) {
+ final Env environment, final IProcedureStore<Env> store, final
ProcedureScheduler scheduler) {
this.environment = environment;
this.scheduler = scheduler;
this.store = store;
- this.lastProcId.incrementAndGet();
}
@TestOnly
- public ProcedureExecutor(final Env environment, final IProcedureStore store)
{
+ public ProcedureExecutor(final Env environment, final IProcedureStore<Env>
store) {
this(environment, store, new SimpleProcedureScheduler());
}
@@ -118,55 +118,34 @@ public class ProcedureExecutor<Env> {
private void recover() {
// 1.Build rollback stack
- int runnableCount = 0;
- int failedCount = 0;
- int waitingCount = 0;
- int waitingTimeoutCount = 0;
- List<Procedure> procedureList = new ArrayList<>();
+ List<Procedure<Env>> procedureList =
getProcedureListFromDifferentVersion();
// Load procedure wal file
- store.load(procedureList);
for (Procedure<Env> proc : procedureList) {
if (proc.isFinished()) {
- completed.putIfAbsent(proc.getProcId(), new
CompletedProcedureContainer(proc));
+ completed.putIfAbsent(proc.getProcId(), new
CompletedProcedureContainer<>(proc));
} else {
if (!proc.hasParent()) {
rollbackStack.put(proc.getProcId(), new RootProcedureStack<>());
}
}
procedures.putIfAbsent(proc.getProcId(), proc);
- switch (proc.getState()) {
- case RUNNABLE:
- runnableCount++;
- break;
- case FAILED:
- failedCount++;
- break;
- case WAITING:
- waitingCount++;
- break;
- case WAITING_TIMEOUT:
- waitingTimeoutCount++;
- break;
- default:
- break;
- }
}
- List<Procedure<Env>> runnableList = new ArrayList<>(runnableCount);
- List<Procedure<Env>> failedList = new ArrayList<>(failedCount);
- List<Procedure<Env>> waitingList = new ArrayList<>(waitingCount);
- List<Procedure<Env>> waitingTimeoutList = new
ArrayList<>(waitingTimeoutCount);
+ List<Procedure<Env>> runnableList = new ArrayList<>();
+ List<Procedure<Env>> failedList = new ArrayList<>();
+ List<Procedure<Env>> waitingList = new ArrayList<>();
+ List<Procedure<Env>> waitingTimeoutList = new ArrayList<>();
for (Procedure<Env> proc : procedureList) {
if (proc.isFinished() && !proc.hasParent()) {
continue;
}
- long rootProcedureId = getRootProcId(proc);
+ long rootProcedureId = getRootProcedureId(proc);
if (proc.hasParent()) {
Procedure<Env> parent = procedures.get(proc.getParentProcId());
if (parent != null && !proc.isFinished()) {
parent.incChildrenLatch();
}
}
- RootProcedureStack rootStack = rollbackStack.get(rootProcedureId);
+ RootProcedureStack<Env> rootStack = rollbackStack.get(rootProcedureId);
if (rootStack != null) {
rootStack.loadStack(proc);
}
@@ -226,8 +205,28 @@ public class ProcedureExecutor<Env> {
scheduler.signalAll();
}
- public long getRootProcId(Procedure proc) {
- return Procedure.getRootProcedureId(procedures, proc);
+ private List<Procedure<Env>> getProcedureListFromDifferentVersion() {
+ if (store.isOldVersionProcedureStore()) {
+ LOG.info("Old procedure directory detected, upgrade beginning...");
+ return store.load();
+ } else {
+ return store.getProcedures();
+ }
+ }
+
+ /**
+ * Helper to look up the root Procedure ID.
+ *
+ * @param proc given a specified procedure.
+ */
+ Long getRootProcedureId(Procedure<Env> proc) {
+ while (proc.hasParent()) {
+ proc = procedures.get(proc.getParentProcId());
+ if (proc == null) {
+ return NO_PROC_ID;
+ }
+ }
+ return proc.getProcId();
}
private void releaseLock(Procedure<Env> procedure, boolean force) {
@@ -303,27 +302,6 @@ public class ProcedureExecutor<Env> {
return timeoutExecutor.remove(internalProcedure);
}
- /**
- * Get next Procedure id
- *
- * @return next procedure id
- */
- private long nextProcId() {
- long procId = lastProcId.incrementAndGet();
- if (procId < 0) {
- while (!lastProcId.compareAndSet(procId, 0)) {
- procId = lastProcId.get();
- if (procId >= 0) {
- break;
- }
- }
- while (procedures.containsKey(procId)) {
- procId = lastProcId.incrementAndGet();
- }
- }
- return procId;
- }
-
/**
* Executes procedure
*
@@ -563,7 +541,7 @@ public class ProcedureExecutor<Env> {
}
subproc.setParentProcId(proc.getProcId());
subproc.setRootProcId(rootProcedureId);
- subproc.setProcId(nextProcId());
+ subproc.setProcId(store.getNextProcId());
subproc.setProcRunnable();
rootProcStack.addSubProcedure(subproc);
}
@@ -713,10 +691,6 @@ public class ProcedureExecutor<Env> {
procedures.remove(proc.getProcId());
}
- private Long getRootProcedureId(Procedure<Env> proc) {
- return Procedure.getRootProcedureId(procedures, proc);
- }
-
/**
* Add a Procedure to executor.
*
@@ -790,7 +764,7 @@ public class ProcedureExecutor<Env> {
@Override
public String toString() {
Procedure<?> p = this.activeProcedure.get();
- return getName() + "(pid=" + (p == null ? Procedure.NO_PROC_ID :
p.getProcId() + ")");
+ return getName() + "(pid=" + (p == null ? NO_PROC_ID : p.getProcId() +
")");
}
/** @return the time since the current procedure is running */
@@ -921,7 +895,7 @@ public class ProcedureExecutor<Env> {
return !procedures.containsKey(procId);
}
- public ConcurrentHashMap<Long, Procedure> getProcedures() {
+ public ConcurrentHashMap<Long, Procedure<Env>> getProcedures() {
return procedures;
}
@@ -933,12 +907,10 @@ public class ProcedureExecutor<Env> {
* @return procedure id
*/
public long submitProcedure(Procedure<Env> procedure) {
- Preconditions.checkArgument(lastProcId.get() >= 0);
Preconditions.checkArgument(procedure.getState() ==
ProcedureState.INITIALIZING);
Preconditions.checkArgument(!procedure.hasParent(), "Unexpected parent",
procedure);
- final long currentProcId = nextProcId();
// Initialize the procedure
- procedure.setProcId(currentProcId);
+ procedure.setProcId(store.getNextProcId());
procedure.setProcRunnable();
// Commit the transaction
store.update(procedure);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
index 28ba7aafcef..d4f919c01be 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/TimeoutExecutorThread.java
@@ -68,7 +68,7 @@ public class TimeoutExecutorThread<Env> extends
StoppableThread {
queue.add(delayTask);
} else {
if (procedure.setTimeoutFailure(executor.getEnvironment())) {
- long rootProcId = executor.getRootProcId(procedure);
+ long rootProcId = executor.getRootProcedureId(procedure);
RootProcedureStack<Env> rollbackStack =
executor.getRollbackStack(rootProcId);
rollbackStack.abort();
executor.getStore().update(procedure);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
index 9fff83235ef..e615eaf7f27 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/AddNeverFinishSubProcedureProcedure.java
@@ -27,12 +27,17 @@ import
org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.DataOutputStream;
import java.io.IOException;
@TestOnly
public class AddNeverFinishSubProcedureProcedure
extends StateMachineProcedure<ConfigNodeProcedureEnv, Integer> {
+ public static final Logger LOGGER =
+ LoggerFactory.getLogger(AddNeverFinishSubProcedureProcedure.class);
public static final String FAIL_DATABASE_NAME = "root.fail";
@Override
@@ -46,6 +51,7 @@ public class AddNeverFinishSubProcedureProcedure
}
if (state == 1) {
// test fail
+ LOGGER.error("AddNeverFinishSubProcedureProcedure run again, which
should never happen");
ProcedureTestUtils.createDatabase(env.getConfigManager(),
FAIL_DATABASE_NAME);
}
return Flow.NO_MORE_STATE;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
index 2fd7b56d252..df6c1220957 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/testonly/NeverFinishProcedure.java
@@ -33,6 +33,12 @@ import java.io.IOException;
/** This procedure will never finish. */
@TestOnly
public class NeverFinishProcedure extends
StateMachineProcedure<ConfigNodeProcedureEnv, Integer> {
+ public NeverFinishProcedure() {}
+
+ public NeverFinishProcedure(long procId) {
+ this.setProcId(procId);
+ }
+
@Override
protected Flow executeFromState(ConfigNodeProcedureEnv env, Integer state)
throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
@@ -65,4 +71,15 @@ public class NeverFinishProcedure extends
StateMachineProcedure<ConfigNodeProced
stream.writeShort(ProcedureType.NEVER_FINISH_PROCEDURE.getTypeCode());
super.serialize(stream);
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ return getProcId() == ((NeverFinishProcedure) o).getProcId();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
index ce832fe7d02..393c1e93740 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java
@@ -21,12 +21,13 @@ package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.manager.ConfigManager;
-import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.consensus.exception.ConsensusException;
import org.slf4j.Logger;
@@ -37,7 +38,7 @@ import java.io.IOException;
import java.util.List;
import java.util.Objects;
-public class ConfigProcedureStore implements IProcedureStore {
+public class ConfigProcedureStore implements
IProcedureStore<ConfigNodeProcedureEnv> {
private static final Logger LOG =
LoggerFactory.getLogger(ConfigProcedureStore.class);
@@ -57,10 +58,6 @@ public class ConfigProcedureStore implements IProcedureStore
{
}
}
- public ConsensusManager getConsensusManager() {
- return configManager.getConsensusManager();
- }
-
@Override
public boolean isRunning() {
return isRunning;
@@ -72,16 +69,31 @@ public class ConfigProcedureStore implements
IProcedureStore {
}
@Override
- public void load(List<Procedure> procedureList) {
- procedureInfo.load(procedureList);
+ public List<Procedure<ConfigNodeProcedureEnv>> load() {
+ return procedureInfo.oldLoad();
+ }
+
+ @Override
+ public List<Procedure<ConfigNodeProcedureEnv>> getProcedures() {
+ return procedureInfo.getProcedures();
+ }
+
+ @Override
+ public ProcedureInfo getProcedureInfo() {
+ return procedureInfo;
}
@Override
- public void update(Procedure procedure) {
+ public long getNextProcId() {
+ return procedureInfo.getNextProcId();
+ }
+
+ @Override
+ public void update(Procedure<ConfigNodeProcedureEnv> procedure) {
Objects.requireNonNull(ProcedureFactory.getProcedureType(procedure),
"Procedure type is null");
final UpdateProcedurePlan updateProcedurePlan = new
UpdateProcedurePlan(procedure);
try {
- getConsensusManager().write(updateProcedurePlan);
+ configManager.getConsensusManager().write(updateProcedurePlan);
} catch (ConsensusException e) {
LOG.warn("Failed in the write API executing the consensus layer due to:
", e);
}
@@ -99,7 +111,7 @@ public class ConfigProcedureStore implements IProcedureStore
{
DeleteProcedurePlan deleteProcedurePlan = new DeleteProcedurePlan();
deleteProcedurePlan.setProcId(procId);
try {
- getConsensusManager().write(deleteProcedurePlan);
+ configManager.getConsensusManager().write(deleteProcedurePlan);
} catch (ConsensusException e) {
LOG.warn("Failed in the write API executing the consensus layer due to:
", e);
}
@@ -138,6 +150,11 @@ public class ConfigProcedureStore implements
IProcedureStore {
private void checkProcWalDir(String procedureWalDir) throws IOException {
File dir = new File(procedureWalDir);
checkOldProcWalDir(dir);
+ }
+
+ @TestOnly
+ public static void createOldProcWalDir() throws IOException {
+ File dir = new
File(CommonDescriptor.getInstance().getConfig().getProcedureWalFolder());
if (!dir.exists()) {
if (dir.mkdirs()) {
LOG.info("Make procedure wal dir: {}", dir);
@@ -156,4 +173,9 @@ public class ConfigProcedureStore implements
IProcedureStore {
FileUtils.moveFileSafe(oldDir, newDir);
}
}
+
+ @Override
+ public boolean isOldVersionProcedureStore() {
+ return procedureInfo.isOldVersion();
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
index 79161e97840..8e8e715fd84 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
@@ -19,21 +19,28 @@
package org.apache.iotdb.confignode.procedure.store;
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.Procedure;
import java.util.List;
-public interface IProcedureStore {
+public interface IProcedureStore<Env> {
boolean isRunning();
void setRunning(boolean running);
- void load(List<Procedure> procedureList);
+ List<Procedure<Env>> load();
- void update(Procedure procedure);
+ List<Procedure<Env>> getProcedures();
- void update(Procedure[] subprocs);
+ ProcedureInfo getProcedureInfo();
+
+ long getNextProcId();
+
+ void update(Procedure<Env> procedure);
+
+ void update(Procedure<Env>[] subprocs);
void delete(long procId);
@@ -46,4 +53,6 @@ public interface IProcedureStore {
void stop();
void start();
+
+ boolean isOldVersionProcedureStore();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 2f7e230898a..0edbae8ba13 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -109,9 +109,9 @@ public enum ProcedureType {
/** Other */
@TestOnly
- NEVER_FINISH_PROCEDURE((short) 66600),
+ NEVER_FINISH_PROCEDURE((short) 30000),
@TestOnly
- ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 66601);
+ ADD_NEVER_FINISH_SUB_PROCEDURE_PROCEDURE((short) 30001);
private final short typeCode;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
index d4638dd0b23..c6288067672 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureWAL.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.procedure.store;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.tsfile.utils.PublicBAOS;
@@ -27,22 +28,19 @@ import org.slf4j.LoggerFactory;
import java.io.DataOutputStream;
import java.io.File;
-import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
+/** Reserve this class for version upgrade test. */
+@TestOnly
public class ProcedureWAL {
- private static final Logger LOG =
LoggerFactory.getLogger(ProcedureWAL.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ProcedureWAL.class);
- private static final String TMP_SUFFIX = ".tmp";
- private static final int PROCEDURE_WAL_BUFFER_SIZE = 8 * 1024 * 1024;
private IProcedureFactory procedureFactory;
private Path walFilePath;
@@ -56,8 +54,9 @@ public class ProcedureWAL {
*
* @throws IOException ioe
*/
+ @TestOnly
public void save(Procedure procedure) throws IOException {
- File walTmp = new File(walFilePath + TMP_SUFFIX);
+ File walTmp = new File(walFilePath + ".tmp");
Path walTmpPath = walTmp.toPath();
Files.deleteIfExists(walTmpPath);
Files.createFile(walTmpPath);
@@ -74,37 +73,4 @@ public class ProcedureWAL {
Files.deleteIfExists(walFilePath);
Files.move(walTmpPath, walFilePath);
}
-
- /**
- * Load wal files into memory
- *
- * @param procedureList procedure list
- */
- public void load(List<Procedure> procedureList) {
- Procedure procedure = null;
- try (FileInputStream fis = new FileInputStream(walFilePath.toFile());
- FileChannel channel = fis.getChannel()) {
- ByteBuffer byteBuffer = ByteBuffer.allocate(PROCEDURE_WAL_BUFFER_SIZE);
- if (channel.read(byteBuffer) > 0) {
- byteBuffer.flip();
- procedure = procedureFactory.create(byteBuffer);
- byteBuffer.clear();
- }
- procedureList.add(procedure);
- } catch (IOException e) {
- LOG.error("Load {} failed, it will be deleted.", walFilePath, e);
- if (!walFilePath.toFile().delete()) {
- LOG.error("{} delete failed; take appropriate action.", walFilePath,
e);
- }
- }
- }
-
- public void delete() {
- try {
- Files.deleteIfExists(Paths.get(walFilePath + TMP_SUFFIX));
- Files.deleteIfExists(walFilePath);
- } catch (IOException e) {
- LOG.error("Delete procedure wal failed.");
- }
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/TestOnlyPlan.java
similarity index 60%
copy from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
copy to
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/TestOnlyPlan.java
index 79161e97840..7ea10ef324c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/consensus/request/TestOnlyPlan.java
@@ -17,33 +17,22 @@
* under the License.
*/
-package org.apache.iotdb.confignode.procedure.store;
+package org.apache.iotdb.confignode.consensus.request;
-import org.apache.iotdb.confignode.procedure.Procedure;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
-import java.util.List;
+public class TestOnlyPlan extends ConfigPhysicalPlan {
+ public TestOnlyPlan() {
+ super(ConfigPhysicalPlanType.TestOnly);
+ }
-public interface IProcedureStore {
+ @Override
+ protected void serializeImpl(DataOutputStream stream) throws IOException {
+ stream.writeShort(getType().getPlanType());
+ }
- boolean isRunning();
-
- void setRunning(boolean running);
-
- void load(List<Procedure> procedureList);
-
- void update(Procedure procedure);
-
- void update(Procedure[] subprocs);
-
- void delete(long procId);
-
- void delete(long[] childProcIds);
-
- void delete(long[] batchIds, int startIndex, int batchCount);
-
- void cleanup();
-
- void stop();
-
- void start();
+ @Override
+ protected void deserializeImpl(ByteBuffer buffer) throws IOException {}
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ProcedureInfoTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ProcedureInfoTest.java
new file mode 100644
index 00000000000..238adadb22c
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ProcedureInfoTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.NeverFinishProcedure;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.stream.LongStream;
+
+import static org.apache.iotdb.db.utils.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ProcedureInfoTest {
+ private static final ProcedureInfo procedureInfo = new ProcedureInfo(null);
+ private static final File snapshotDir = new File(BASE_OUTPUT_PATH,
"snapshot");
+
+ @BeforeClass
+ public static void setup() {
+ if (!snapshotDir.exists()) {
+ snapshotDir.mkdirs();
+ }
+ }
+
+ @AfterClass
+ public static void cleanup() {
+ if (snapshotDir.exists()) {
+ FileUtils.deleteDirectory(snapshotDir);
+ }
+ }
+
+ @Test
+ public void testProcedureId() {
+ final long lastProcedureId = 100;
+ LongStream.range(0, lastProcedureId)
+ .forEach(
+ id ->
+ procedureInfo.updateProcedure(
+ new UpdateProcedurePlan(new NeverFinishProcedure(id))));
+ Assert.assertEquals(procedureInfo.getNextProcId(), lastProcedureId);
+ }
+
+ @Test
+ public void testSnapshot() throws Exception {
+ procedureInfo.updateProcedure(new UpdateProcedurePlan(new
NeverFinishProcedure(1)));
+ procedureInfo.updateProcedure(new UpdateProcedurePlan(new
NeverFinishProcedure(100)));
+ procedureInfo.updateProcedure(new UpdateProcedurePlan(new
NeverFinishProcedure(99999)));
+ Assert.assertTrue(procedureInfo.processTakeSnapshot(snapshotDir));
+ ProcedureInfo procedureInfo1 = new ProcedureInfo(null);
+ procedureInfo1.processLoadSnapshot(snapshotDir);
+ Assert.assertEquals(procedureInfo, procedureInfo1);
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/NoopProcedureStore.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/NoopProcedureStore.java
index 07e35a6f269..749d8fa725a 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/NoopProcedureStore.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/NoopProcedureStore.java
@@ -19,12 +19,19 @@
package org.apache.iotdb.confignode.procedure;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
+import java.util.Collections;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
+@TestOnly
public class NoopProcedureStore implements IProcedureStore {
+ AtomicLong lastProcId = new AtomicLong(-1);
+
private volatile boolean running = false;
@Override
@@ -38,7 +45,24 @@ public class NoopProcedureStore implements IProcedureStore {
}
@Override
- public void load(List<Procedure> procedureList) {}
+ public List<Procedure> load() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public List<Procedure> getProcedures() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public ProcedureInfo getProcedureInfo() {
+ return null;
+ }
+
+ @Override
+ public synchronized long getNextProcId() {
+ return lastProcId.addAndGet(1);
+ }
@Override
public void update(Procedure procedure) {}
@@ -67,4 +91,9 @@ public class NoopProcedureStore implements IProcedureStore {
public void start() {
running = true;
}
+
+ @Override
+ public boolean isOldVersionProcedureStore() {
+ return true;
+ }
}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/UpgradeFromWALToConsensusLayerTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/UpgradeFromWALToConsensusLayerTest.java
new file mode 100644
index 00000000000..1bbd8616c5f
--- /dev/null
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/UpgradeFromWALToConsensusLayerTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure;
+
+import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.consensus.request.TestOnlyPlan;
+import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
+import org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodeProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.testonly.NeverFinishProcedure;
+import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
+import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
+import org.apache.iotdb.consensus.exception.ConsensusException;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+public class UpgradeFromWALToConsensusLayerTest {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(UpgradeFromWALToConsensusLayerTest.class);
+ ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
+
+ private static final String DATA_DIR =
"data_UpgradeFromWALToConsensusLayerTest";
+
+ @Before
+ public void setUp() throws Exception {
+ conf.setConsensusDir(DATA_DIR + File.separator + conf.getConsensusDir());
+ conf.setSystemDir(DATA_DIR + File.separator + conf.getSystemDir());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ FileUtils.recursiveDeleteFolder(DATA_DIR);
+ conf.setConsensusDir(conf.getConsensusDir().replace(DATA_DIR +
File.separator, ""));
+ conf.setSystemDir(conf.getSystemDir().replace(DATA_DIR + File.separator,
""));
+ }
+
+ /**
+ * This test will fully start the ConfigManager, generating some files that
cannot be cleaned up,
+ * which will affect other tests. Therefore, this test is not enabled by
default
+ */
+ @Ignore
+ @Test
+ public void test() throws IOException, ConsensusException,
InterruptedException {
+ // start configManager for the first time
+ ConfigManager configManager = new ConfigManager();
+ conf.setConfigNodeId(0);
+ conf.setInternalAddress("127.0.0.1");
+ configManager.initConsensusManager();
+
+ // write some raft log to increase index, otherwise cannot take snapshot
+ configManager.getConsensusManager().write(new TestOnlyPlan());
+ configManager.getConsensusManager().write(new TestOnlyPlan());
+ configManager.getConsensusManager().write(new TestOnlyPlan());
+
+ ProcedureInfo procedureInfo =
configManager.getProcedureManager().getStore().getProcedureInfo();
+ ConfigProcedureStore.createOldProcWalDir();
+
+ // prepare procedures
+ RemoveDataNodeProcedure removeDataNodeProcedure =
+ new RemoveDataNodeProcedure(
+ new TDataNodeLocation(
+ 10000,
+ new TEndPoint("127.0.0.1", 6600),
+ new TEndPoint("127.0.0.1", 7700),
+ new TEndPoint("127.0.0.1", 8800),
+ new TEndPoint("127.0.0.1", 9900),
+ new TEndPoint("127.0.0.1", 11000)));
+ removeDataNodeProcedure.setProcId(10086);
+ AddConfigNodeProcedure addConfigNodeProcedure =
+ new AddConfigNodeProcedure(
+ new TConfigNodeLocation(
+ 0, new TEndPoint("0.0.0.0", 22277), new TEndPoint("0.0.0.0",
22278)),
+ new TNodeVersionInfo());
+ addConfigNodeProcedure.setProcId(888888);
+ List<Procedure> procedureList =
+ Arrays.asList(
+ new NeverFinishProcedure(0),
+ removeDataNodeProcedure,
+ new NeverFinishProcedure(199),
+ addConfigNodeProcedure,
+ new NeverFinishProcedure(29999));
+ procedureList.forEach(
+ procedure -> procedureInfo.oldUpdateProcedure(new
UpdateProcedurePlan(procedure)));
+
+ // take snapshot manually
+ procedureInfo.oldLoad();
+ procedureInfo.upgrade();
+ // check if wal files deleted
+ Assert.assertFalse(procedureInfo.isOldVersion());
+
+ // reactivate configManager to load snapshot
+ configManager.close();
+ configManager = new ConfigManager();
+ configManager.initConsensusManager();
+ // check procedures which loaded from snapshot
+ List<Procedure<ConfigNodeProcedureEnv>> newProcedureList =
+
configManager.getProcedureManager().getStore().getProcedureInfo().getProcedures();
+ Assert.assertEquals(procedureList.size(), newProcedureList.size());
+ Assert.assertTrue(newProcedureList.containsAll(procedureList));
+ }
+}
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/util/ProcedureTestUtil.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/util/ProcedureTestUtil.java
index fbabd754ff3..8682df7617c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/util/ProcedureTestUtil.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/util/ProcedureTestUtil.java
@@ -20,8 +20,6 @@
package org.apache.iotdb.confignode.procedure.util;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
-import org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
-import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
import java.util.concurrent.TimeUnit;
@@ -54,13 +52,4 @@ public class ProcedureTestUtil {
Thread.currentThread().interrupt();
}
}
-
- public static void stopService(
- ProcedureExecutor procExecutor, ProcedureScheduler scheduler,
IProcedureStore store) {
- procExecutor.stop();
- procExecutor.join();
- scheduler.clear();
- scheduler.stop();
- store.stop();
- }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
index c627692b72d..0a798bee793 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ratis/RatisConsensus.java
@@ -678,10 +678,9 @@ class RatisConsensus implements IConsensus {
throw new ConsensusGroupNotExistException(groupId);
}
- // TODO tuning snapshot create timeout
SnapshotManagementRequest request =
SnapshotManagementRequest.newCreate(
- localFakeId, myself.getId(), raftGroupId,
localFakeCallId.incrementAndGet(), 30000);
+ localFakeId, myself.getId(), raftGroupId,
localFakeCallId.incrementAndGet(), 300000);
RaftClientReply reply;
try {