This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 5f796ba7c8d Add recover IT for Procedure, and delete ProcedureStore 
(#12045)
5f796ba7c8d is described below

commit 5f796ba7c8d61e2179d5d4dae753b9830ede07c2
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Feb 23 18:41:33 2024 +0800

    Add recover IT for Procedure, and delete ProcedureStore (#12045)
---
 .../confignode/it/procedure/IoTDBProcedureIT.java  | 137 +++++++++++++++
 .../iotdb/confignode/manager/ConfigManager.java    |   7 +
 .../apache/iotdb/confignode/manager/IManager.java  |   9 +
 .../iotdb/confignode/manager/ProcedureManager.java |  13 +-
 .../confignode/persistence/ProcedureInfo.java      |   7 +-
 .../iotdb/confignode/procedure/Procedure.java      |  15 ++
 .../confignode/procedure/ProcedureExecutor.java    |   2 +
 .../impl/CreateManyDatabasesProcedure.java         | 116 +++++++++++++
 .../procedure/store/ProcedureFactory.java          |   9 +-
 .../confignode/procedure/store/ProcedureStore.java | 192 ---------------------
 .../confignode/procedure/store/ProcedureType.java  |   5 +-
 .../thrift/ConfigNodeRPCServiceProcessor.java      |   5 +
 ...TestSTMProcedure.java => STMProcedureTest.java} |   2 +-
 .../procedure/store/TestProcedureStore.java        | 113 ------------
 .../iotdb/db/protocol/client/ConfigNodeClient.java |   6 +
 .../org/apache/iotdb/commons/utils/TestOnly.java   |   8 +-
 .../src/main/thrift/confignode.thrift              |   3 +
 17 files changed, 327 insertions(+), 322 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
new file mode 100644
index 00000000000..4d0f388a33b
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.procedure;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
+import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure.MAX_STATE;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBProcedureIT {
+  private static Logger LOGGER = 
LoggerFactory.getLogger(IoTDBProcedureIT.class);
+
+  @Before
+  public void setUp() {
+    EnvFactory.getEnv()
+        .getConfig()
+        .getCommonConfig()
+        .setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS)
+        .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+        .setDataRegionConsensusProtocolClass(RATIS_CONSENSUS)
+        .setDataReplicationFactor(1);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    EnvFactory.getEnv().cleanClusterEnvironment();
+  }
+
+  /**
+   * During CreateManyDatabasesProcedure executing, we expect that the 
procedure will be interrupted
+   * only once. So let us shutdown the leader at the middle of the procedure.
+   */
+  @Test
+  public void procedureRecoverAtAnotherConfigNodeTest() throws Exception {
+    recoverTest(3, false);
+  }
+
+  @Test
+  public void procedureRecoverAtTheSameConfigNodeTest() throws Exception {
+    recoverTest(1, true);
+  }
+
+  private void recoverTest(int configNodeNum, boolean needRestartLeader) 
throws Exception {
+    EnvFactory.getEnv().initClusterEnvironment(configNodeNum, 1);
+
+    // prepare expectedDatabases
+    Set<String> expectedDatabases = new HashSet<>();
+    for (int id = CreateManyDatabasesProcedure.INITIAL_STATE; id < MAX_STATE; 
id++) {
+      expectedDatabases.add(CreateManyDatabasesProcedure.DATABASE_NAME_PREFIX 
+ id);
+    }
+    Assert.assertEquals(MAX_STATE, expectedDatabases.size());
+
+    SyncConfigNodeIServiceClient leaderClient =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection();
+    leaderClient.createManyDatabases();
+
+    // prepare req
+    final TGetDatabaseReq req =
+        new TGetDatabaseReq(
+            Arrays.asList(
+                new ShowDatabaseStatement(new 
PartialPath(SqlConstant.getSingleRootArray()))
+                    .getPathPattern()
+                    .getNodes()),
+            SchemaConstant.ALL_MATCH_SCOPE.serialize());
+
+    // Make sure the procedure has not finished yet
+    TShowDatabaseResp resp = leaderClient.showDatabase(req);
+    Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE);
+    // Then shutdown the leader, wait the new leader exist and the procedure 
continue
+    final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex();
+    EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stop();
+    if (needRestartLeader) {
+      EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).start();
+    }
+    SyncConfigNodeIServiceClient newLeaderClient =
+        (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection();
+    Callable<Boolean> finalCheck =
+        () -> {
+          TShowDatabaseResp resp1 = newLeaderClient.showDatabase(req);
+          if (MAX_STATE != resp1.getDatabaseInfoMap().size()) {
+            return false;
+          }
+          resp1
+              .getDatabaseInfoMap()
+              .keySet()
+              .forEach(databaseName -> expectedDatabases.remove(databaseName));
+          return expectedDatabases.isEmpty();
+        };
+    Awaitility.await().atMost(1, TimeUnit.MINUTES).until(finalCheck);
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ae88161b142..760de526241 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.utils.AuthUtils;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -605,6 +606,12 @@ public class ConfigManager implements IManager {
     }
   }
 
+  @TestOnly
+  @Override
+  public TSStatus createManyDatabases() {
+    return getProcedureManager().createManyDatabases();
+  }
+
   private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path, 
PartialPath database) {
     // The path contains `**`
     if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 56ed18cf566..e9e310bc342 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
 import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
 import 
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
@@ -314,6 +315,14 @@ public interface IManager {
    */
   TSStatus deleteDatabases(List<String> deletedPaths);
 
+  /**
+   * Create many databases.
+   *
+   * @return status
+   */
+  @TestOnly
+  TSStatus createManyDatabases();
+
   /**
    * Get SchemaPartition.
    *
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 0fe46a5fc68..4a3a2547cea 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.trigger.TriggerInformation;
 import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -50,6 +51,7 @@ import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
 import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
 import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -81,7 +83,6 @@ import 
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
 import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
 import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
 import org.apache.iotdb.confignode.procedure.store.ProcedureType;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
 import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -172,6 +173,12 @@ public class ProcedureManager {
     }
   }
 
+  @TestOnly
+  public TSStatus createManyDatabases() {
+    this.executor.submitProcedure(new CreateManyDatabasesProcedure());
+    return StatusUtils.OK;
+  }
+
   public TSStatus deleteDatabases(ArrayList<TDatabaseSchema> 
deleteSgSchemaList) {
     List<Long> procedureIds = new ArrayList<>();
     for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) {
@@ -1023,10 +1030,6 @@ public class ProcedureManager {
     return store;
   }
 
-  public void setStore(ProcedureStore store) {
-    this.store = store;
-  }
-
   public ConfigNodeProcedureEnv getEnv() {
     return env;
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index aebab36ddbd..fe181ef382e 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -25,7 +25,6 @@ import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProce
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
 import org.apache.iotdb.confignode.procedure.Procedure;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -44,6 +43,8 @@ public class ProcedureInfo {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
   private final String procedureWalDir =
       CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
@@ -51,7 +52,7 @@ public class ProcedureInfo {
 
   public void load(List<Procedure> procedureList) {
     try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
-      s.filter(path -> 
path.getFileName().toString().endsWith(ProcedureStore.PROCEDURE_WAL_SUFFIX))
+      s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
           .sorted(
               (p1, p2) ->
                   Long.compareUnsigned(
@@ -74,7 +75,7 @@ public class ProcedureInfo {
   public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
     Procedure procedure = updateProcedurePlan.getProcedure();
     long procId = procedure.getProcId();
-    Path path = Paths.get(procedureWalDir, procId + 
ProcedureStore.PROCEDURE_WAL_SUFFIX);
+    Path path = Paths.get(procedureWalDir, procId + PROCEDURE_WAL_SUFFIX);
     ProcedureWAL procedureWAL =
         procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, 
procedureFactory));
     try {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 0b90e33dad1..53bdc7774a3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -52,6 +52,11 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
   public static final long NO_PROC_ID = -1;
   public static final long NO_TIMEOUT = -1;
+  /**
+   * The isDeserialized of a newly created procedure is false. When a leader 
switch or ConfigNode
+   * restart occurs during the execution of the procedure, isDeserialized 
becomes true.
+   */
+  private boolean isDeserialized = false;
 
   private long parentProcId = NO_PROC_ID;
   private long rootProcId = NO_PROC_ID;
@@ -188,6 +193,8 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
   public void deserialize(ByteBuffer byteBuffer) {
     // procid
     this.setProcId(byteBuffer.getLong());
+    // isDeserialized
+    this.setDeserialized(true);
     // state
     this.setState(ProcedureState.values()[byteBuffer.getInt()]);
     //  submit time
@@ -539,6 +546,10 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
     return procId;
   }
 
+  public boolean isDeserialized() {
+    return isDeserialized;
+  }
+
   public boolean hasParent() {
     return parentProcId != NO_PROC_ID;
   }
@@ -564,6 +575,10 @@ public abstract class Procedure<Env> implements 
Comparable<Procedure<Env>> {
     this.procId = procId;
   }
 
+  private void setDeserialized(boolean isDeserialized) {
+    this.isDeserialized = isDeserialized;
+  }
+
   public void setProcRunnable() {
     this.submittedTime = System.currentTimeMillis();
     setState(ProcedureState.RUNNABLE);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index ffc205dcbfc..4afb5b0eb32 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure;
 
 import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
 import 
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
 import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
@@ -88,6 +89,7 @@ public class ProcedureExecutor<Env> {
     this.lastProcId.incrementAndGet();
   }
 
+  @TestOnly
   public ProcedureExecutor(final Env environment, final IProcedureStore store) 
{
     this(environment, store, new SimpleProcedureScheduler());
   }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
new file mode 100644
index 00000000000..9721e4afb88
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import 
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import 
org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * This procedure will create numerous databases (perhaps 100), during which 
the confignode leader
+ * should be externally shutdown to test whether the procedure can be 
correctly recovered after the
+ * leader change. The procedure will never finish until it's recovered from 
another ConfigNode.
+ */
+@TestOnly
+public class CreateManyDatabasesProcedure
+    extends StateMachineProcedure<ConfigNodeProcedureEnv, Integer> {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(CreateManyDatabasesProcedure.class);
+  public static final int INITIAL_STATE = 0;
+  public static final int MAX_STATE = 100;
+  public static final String DATABASE_NAME_PREFIX = "root.test_";
+  public static final long SLEEP_FOREVER = Long.MAX_VALUE;
+  private boolean createFailedOnce = false;
+
+  @Override
+  protected Flow executeFromState(ConfigNodeProcedureEnv 
configNodeProcedureEnv, Integer state)
+      throws InterruptedException {
+    if (state < MAX_STATE) {
+      if (state == MAX_STATE - 1 && !isDeserialized()) {
+        Thread.sleep(SLEEP_FOREVER);
+      }
+      try {
+        createDatabase(configNodeProcedureEnv, state);
+      } catch (ProcedureException e) {
+        setFailure(e);
+        return Flow.NO_MORE_STATE;
+      }
+      setNextState(state + 1);
+      return Flow.HAS_MORE_STATE;
+    }
+    return Flow.NO_MORE_STATE;
+  }
+
+  private void createDatabase(ConfigNodeProcedureEnv env, int id) throws 
ProcedureException {
+    String databaseName = DATABASE_NAME_PREFIX + id;
+    TDatabaseSchema databaseSchema = new TDatabaseSchema(databaseName);
+    TSStatus status =
+        env.getConfigManager()
+            .setDatabase(
+                new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase, 
databaseSchema));
+    if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() == 
status.getCode()) {
+      // First mistakes are forgivable, but a second signals a problem.
+      if (!createFailedOnce) {
+        createFailedOnce = true;
+      } else {
+        throw new ProcedureException("createDatabase fail twice");
+      }
+    } else if (TSStatusCode.SUCCESS_STATUS.getStatusCode() != 
status.getCode()) {
+      throw new ProcedureException("Unexpected fail, tsStatus is " + status);
+    }
+  }
+
+  @Override
+  protected void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv, 
Integer integer)
+      throws IOException, InterruptedException, ProcedureException {}
+
+  @Override
+  protected Integer getState(int stateId) {
+    return stateId;
+  }
+
+  @Override
+  protected int getStateId(Integer integer) {
+    return integer;
+  }
+
+  @Override
+  protected Integer getInitialState() {
+    return INITIAL_STATE;
+  }
+
+  @Override
+  public void serialize(DataOutputStream stream) throws IOException {
+    
stream.writeShort(ProcedureType.CREATE_MANY_DATABASES_PROCEDURE.getTypeCode());
+    super.serialize(stream);
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index a4257a133c4..0ad156e8253 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.confignode.procedure.store;
 
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
 import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
 import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
 import 
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -165,6 +166,9 @@ public class ProcedureFactory implements IProcedureFactory {
       case AUTH_OPERATE_PROCEDURE:
         procedure = new AuthOperationProcedure();
         break;
+      case CREATE_MANY_DATABASES_PROCEDURE:
+        procedure = new CreateManyDatabasesProcedure();
+        break;
       default:
         LOGGER.error("unknown Procedure type: " + typeCode);
         throw new IOException("unknown Procedure type: " + typeCode);
@@ -234,8 +238,11 @@ public class ProcedureFactory implements IProcedureFactory 
{
       return ProcedureType.ALTER_LOGICAL_VIEW_PROCEDURE;
     } else if (procedure instanceof AuthOperationProcedure) {
       return ProcedureType.AUTH_OPERATE_PROCEDURE;
+    } else if (procedure instanceof CreateManyDatabasesProcedure) {
+      return ProcedureType.CREATE_MANY_DATABASES_PROCEDURE;
     }
-    return null;
+    throw new UnsupportedOperationException(
+        "Procedure type " + procedure.getClass() + " is not supported");
   }
 
   private static class ProcedureFactoryHolder {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
deleted file mode 100644
index 123199d8411..00000000000
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.store;
-
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.procedure.Procedure;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
-
-public class ProcedureStore implements IProcedureStore {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureStore.class);
-  private String procedureWalDir =
-      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
-  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new 
ConcurrentHashMap<>();
-  public static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
-  private final IProcedureFactory procedureFactory;
-  private volatile boolean isRunning = false;
-
-  public ProcedureStore(IProcedureFactory procedureFactory) {
-    try {
-      this.procedureFactory = procedureFactory;
-      Files.createDirectories(Paths.get(procedureWalDir));
-    } catch (IOException e) {
-      throw new RuntimeException("Create procedure wal directory failed.", e);
-    }
-  }
-
-  @TestOnly
-  public ProcedureStore(String testWALDir, IProcedureFactory procedureFactory) 
{
-    this.procedureFactory = procedureFactory;
-    try {
-      Files.createDirectories(Paths.get(testWALDir));
-      procedureWalDir = testWALDir;
-    } catch (IOException e) {
-      throw new RuntimeException("Create procedure wal directory failed.", e);
-    }
-  }
-
-  public boolean isRunning() {
-    return this.isRunning;
-  }
-
-  public void setRunning(boolean running) {
-    isRunning = running;
-  }
-
-  /**
-   * Load procedure wal files into memory.
-   *
-   * @param procedureList procedureList
-   */
-  public void load(List<Procedure> procedureList) {
-    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
-      s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
-          .sorted(
-              (p1, p2) ->
-                  Long.compareUnsigned(
-                      
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
-                      
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
-          .forEach(
-              path -> {
-                String fileName = path.getFileName().toString();
-                long procId = Long.parseLong(fileName.split("\\.")[0]);
-                ProcedureWAL procedureWAL =
-                    procWALMap.computeIfAbsent(
-                        procId, id -> new ProcedureWAL(path, 
procedureFactory));
-                procedureWAL.load(procedureList);
-              });
-    } catch (IOException e) {
-      LOG.error("Load procedure wal failed.", e);
-    }
-  }
-
-  /**
-   * Update procedure, roughly delete and create a new wal file.
-   *
-   * @param procedure procedure
-   */
-  public void update(Procedure procedure) {
-    if (!procedure.needPersistance()) {
-      procWALMap.remove(procedure.getProcId());
-      return;
-    }
-    long procId = procedure.getProcId();
-    Path path = Paths.get(procedureWalDir, procId + 
ProcedureStore.PROCEDURE_WAL_SUFFIX);
-    ProcedureWAL procedureWAL =
-        procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, 
procedureFactory));
-    try {
-      procedureWAL.save(procedure);
-    } catch (IOException e) {
-      LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId());
-    }
-  }
-
-  /**
-   * Batch update
-   *
-   * @param subprocs procedure array
-   */
-  public void update(Procedure[] subprocs) {
-    for (Procedure subproc : subprocs) {
-      update(subproc);
-    }
-  }
-
-  /**
-   * Delete procedure wal file
-   *
-   * @param procId procedure id
-   */
-  public void delete(long procId) {
-    ProcedureWAL procedureWAL = procWALMap.get(procId);
-    if (procedureWAL != null) {
-      procedureWAL.delete();
-    }
-    procWALMap.remove(procId);
-  }
-
-  /**
-   * Batch delete
-   *
-   * @param childProcIds procedure id array
-   */
-  public void delete(long[] childProcIds) {
-    for (long childProcId : childProcIds) {
-      delete(childProcId);
-    }
-  }
-
-  /**
-   * Batch delete by index
-   *
-   * @param batchIds batchIds
-   * @param startIndex start index
-   * @param batchCount delete procedure count
-   */
-  public void delete(long[] batchIds, int startIndex, int batchCount) {
-    for (int i = startIndex; i < batchCount; i++) {
-      delete(batchIds[i]);
-    }
-  }
-
-  /** clean all the wal, used for unit test. */
-  public void cleanup() {
-    try {
-      FileUtils.cleanDirectory(new File(procedureWalDir));
-    } catch (IOException e) {
-      LOG.error("Clean wal directory failed", e);
-    }
-  }
-
-  public void stop() {
-    isRunning = false;
-  }
-
-  @Override
-  public void start() {
-    if (!isRunning) {
-      isRunning = true;
-    }
-  }
-}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index b539ffc46df..f7e7c68a584 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -30,12 +30,11 @@ public enum ProcedureType {
   /** DataNode */
   REMOVE_DATA_NODE_PROCEDURE((short) 100),
 
-  /** StorageGroup */
+  /** StorageGroup and Region */
   DELETE_STORAGE_GROUP_PROCEDURE((short) 200),
-
-  /** Region */
   REGION_MIGRATE_PROCEDURE((short) 201),
   CREATE_REGION_GROUPS((short) 202),
+  CREATE_MANY_DATABASES_PROCEDURE((short) 203),
 
   /** Timeseries */
   DELETE_TIMESERIES_PROCEDURE((short) 300),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 9d35842edf2..95c82a1c001 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -480,6 +480,11 @@ public class ConfigNodeRPCServiceProcessor implements 
IConfigNodeRPCService.Ifac
     return databaseSchemaResp.convertToRPCStorageGroupSchemaResp();
   }
 
+  @Override
+  public TSStatus createManyDatabases() throws TException {
+    return configManager.createManyDatabases();
+  }
+
   @Override
   public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq 
req) {
     PathPatternTree patternTree =
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/STMProcedureTest.java
similarity index 97%
rename from 
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
rename to 
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/STMProcedureTest.java
index 7b387d3d438..3ccbb2e1516 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/STMProcedureTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
 
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class TestSTMProcedure extends TestProcedureBase {
+public class STMProcedureTest extends TestProcedureBase {
 
   @Test
   public void testSubmitProcedure() {
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
deleted file mode 100644
index 9e1139adc30..00000000000
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.store;
-
-import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
-import org.apache.iotdb.confignode.procedure.TestProcedureBase;
-import org.apache.iotdb.confignode.procedure.entity.IncProcedure;
-import org.apache.iotdb.confignode.procedure.entity.StuckSTMProcedure;
-import org.apache.iotdb.confignode.procedure.entity.TestProcedureFactory;
-import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
-import org.apache.iotdb.confignode.procedure.state.ProcedureState;
-import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TestProcedureStore extends TestProcedureBase {
-
-  private static final String TEST_DIR = "./target/testWAL/";
-  private static final int WORK_THREAD = 2;
-  private IProcedureFactory factory = new TestProcedureFactory();
-
-  @Override
-  protected void initExecutor() {
-    this.env = new TestProcEnv();
-    this.procStore = new ProcedureStore(TEST_DIR, factory);
-    this.procExecutor = new ProcedureExecutor<>(env, procStore);
-    this.env.setScheduler(this.procExecutor.getScheduler());
-    this.procExecutor.init(WORK_THREAD);
-  }
-
-  @Test
-  public void testUpdate() {
-    ProcedureStore procedureStore = new ProcedureStore(TEST_DIR, factory);
-    IncProcedure incProcedure = new IncProcedure();
-    procedureStore.update(incProcedure);
-    List<Procedure> procedureList = new ArrayList<>();
-    procedureStore.load(procedureList);
-    assertProc(
-        incProcedure,
-        procedureList.get(0).getClass(),
-        procedureList.get(0).getProcId(),
-        procedureList.get(0).getState());
-    this.procStore.cleanup();
-    try {
-      FileUtils.cleanDirectory(new File(TEST_DIR));
-    } catch (IOException e) {
-      System.out.println("clean dir failed." + e);
-    }
-  }
-
-  @Test
-  public void testChildProcedureLoad() {
-    int childCount = 10;
-    StuckSTMProcedure STMProcedure = new StuckSTMProcedure(childCount);
-    long rootId = procExecutor.submitProcedure(STMProcedure);
-    ProcedureTestUtil.sleepWithoutInterrupt(50);
-    // stop service
-    ProcedureTestUtil.stopService(procExecutor, procExecutor.getScheduler(), 
procStore);
-    ConcurrentHashMap<Long, Procedure> procedures = 
procExecutor.getProcedures();
-    ProcedureStore procedureStore = new ProcedureStore(TEST_DIR, new 
TestProcedureFactory());
-    List<Procedure> procedureList = new ArrayList<>();
-    procedureStore.load(procedureList);
-    Assert.assertEquals(childCount + 1, procedureList.size());
-    for (int i = 0; i < procedureList.size(); i++) {
-      Procedure procedure = procedureList.get(i);
-      assertProc(
-          procedure,
-          procedures.get(procedure.getProcId()).getClass(),
-          i + 1,
-          procedures.get(procedure.getProcId()).getState());
-    }
-    // restart service
-    initExecutor();
-    this.procStore.start();
-    this.procExecutor.startWorkers();
-
-    ProcedureTestUtil.waitForProcedure(procExecutor, rootId);
-    Assert.assertEquals(
-        procExecutor.getResultOrProcedure(rootId).getState(), 
ProcedureState.SUCCESS);
-  }
-
-  private void assertProc(Procedure proc, Class clazz, long procId, 
ProcedureState state) {
-    Assert.assertEquals(clazz, proc.getClass());
-    Assert.assertEquals(procId, proc.getProcId());
-    Assert.assertEquals(state, proc.getState());
-  }
-}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 6cb1373fd17..1387e5f338f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -471,6 +471,12 @@ public class ConfigNodeClient implements 
IConfigNodeRPCService.Iface, ThriftClie
         () -> client.getMatchedDatabaseSchemas(req), resp -> 
!updateConfigNodeLeader(resp.status));
   }
 
+  @Override
+  public TSStatus createManyDatabases() throws TException {
+    return executeRemoteCallWithRetry(
+        () -> client.createManyDatabases(), status -> 
!updateConfigNodeLeader(status));
+  }
+
   @Override
   public TSStatus setTTL(TSetTTLReq setTTLReq) throws TException {
     return executeRemoteCallWithRetry(
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
index 05ade0b21c8..089d906e91c 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
@@ -25,10 +25,10 @@ import java.lang.annotation.Retention;
 import java.lang.annotation.RetentionPolicy;
 import java.lang.annotation.Target;
 
-@Target({ElementType.METHOD, ElementType.CONSTRUCTOR})
-@Retention(RetentionPolicy.SOURCE)
 /**
- * TestOnly implies that the method should only be used in the tests, 
otherwise its functionality is
- * not guaranteed and may interfere with the normal code.
+ * TestOnly implies that the class or method should only be used in the tests, 
otherwise its
+ * functionality is not guaranteed and may interfere with the normal code.
  */
+@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
+@Retention(RetentionPolicy.SOURCE)
 public @interface TestOnly {}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift 
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index a04cfe588a5..989255c0cc8 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -945,6 +945,9 @@ service IConfigNodeRPCService {
   /** Get the matched Databases' TDatabaseSchema */
   TDatabaseSchemaResp getMatchedDatabaseSchemas(TGetDatabaseReq req)
 
+  /** Test only */
+  common.TSStatus createManyDatabases()
+
   // ======================================================
   // SchemaPartition
   // ======================================================


Reply via email to