This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5f796ba7c8d Add recover IT for Procedure, and delete ProcedureStore
(#12045)
5f796ba7c8d is described below
commit 5f796ba7c8d61e2179d5d4dae753b9830ede07c2
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Feb 23 18:41:33 2024 +0800
Add recover IT for Procedure, and delete ProcedureStore (#12045)
---
.../confignode/it/procedure/IoTDBProcedureIT.java | 137 +++++++++++++++
.../iotdb/confignode/manager/ConfigManager.java | 7 +
.../apache/iotdb/confignode/manager/IManager.java | 9 +
.../iotdb/confignode/manager/ProcedureManager.java | 13 +-
.../confignode/persistence/ProcedureInfo.java | 7 +-
.../iotdb/confignode/procedure/Procedure.java | 15 ++
.../confignode/procedure/ProcedureExecutor.java | 2 +
.../impl/CreateManyDatabasesProcedure.java | 116 +++++++++++++
.../procedure/store/ProcedureFactory.java | 9 +-
.../confignode/procedure/store/ProcedureStore.java | 192 ---------------------
.../confignode/procedure/store/ProcedureType.java | 5 +-
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 +
...TestSTMProcedure.java => STMProcedureTest.java} | 2 +-
.../procedure/store/TestProcedureStore.java | 113 ------------
.../iotdb/db/protocol/client/ConfigNodeClient.java | 6 +
.../org/apache/iotdb/commons/utils/TestOnly.java | 8 +-
.../src/main/thrift/confignode.thrift | 3 +
17 files changed, 327 insertions(+), 322 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
new file mode 100644
index 00000000000..4d0f388a33b
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.procedure;
+
+import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.schema.SchemaConstant;
+import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
+import org.apache.iotdb.confignode.rpc.thrift.TGetDatabaseReq;
+import org.apache.iotdb.confignode.rpc.thrift.TShowDatabaseResp;
+import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowDatabaseStatement;
+import org.apache.iotdb.db.utils.constant.SqlConstant;
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.awaitility.Awaitility;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure.MAX_STATE;
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class IoTDBProcedureIT {
+ private static Logger LOGGER =
LoggerFactory.getLogger(IoTDBProcedureIT.class);
+
+ @Before
+ public void setUp() {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setDataReplicationFactor(1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ /**
+ * During CreateManyDatabasesProcedure executing, we expect that the
procedure will be interrupted
+ * only once. So let us shutdown the leader at the middle of the procedure.
+ */
+ @Test
+ public void procedureRecoverAtAnotherConfigNodeTest() throws Exception {
+ recoverTest(3, false);
+ }
+
+ @Test
+ public void procedureRecoverAtTheSameConfigNodeTest() throws Exception {
+ recoverTest(1, true);
+ }
+
+ private void recoverTest(int configNodeNum, boolean needRestartLeader)
throws Exception {
+ EnvFactory.getEnv().initClusterEnvironment(configNodeNum, 1);
+
+ // prepare expectedDatabases
+ Set<String> expectedDatabases = new HashSet<>();
+ for (int id = CreateManyDatabasesProcedure.INITIAL_STATE; id < MAX_STATE;
id++) {
+ expectedDatabases.add(CreateManyDatabasesProcedure.DATABASE_NAME_PREFIX
+ id);
+ }
+ Assert.assertEquals(MAX_STATE, expectedDatabases.size());
+
+ SyncConfigNodeIServiceClient leaderClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
+ leaderClient.createManyDatabases();
+
+ // prepare req
+ final TGetDatabaseReq req =
+ new TGetDatabaseReq(
+ Arrays.asList(
+ new ShowDatabaseStatement(new
PartialPath(SqlConstant.getSingleRootArray()))
+ .getPathPattern()
+ .getNodes()),
+ SchemaConstant.ALL_MATCH_SCOPE.serialize());
+
+ // Make sure the procedure has not finished yet
+ TShowDatabaseResp resp = leaderClient.showDatabase(req);
+ Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE);
+ // Then shutdown the leader, wait the new leader exist and the procedure
continue
+ final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex();
+ EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stop();
+ if (needRestartLeader) {
+ EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).start();
+ }
+ SyncConfigNodeIServiceClient newLeaderClient =
+ (SyncConfigNodeIServiceClient)
EnvFactory.getEnv().getLeaderConfigNodeConnection();
+ Callable<Boolean> finalCheck =
+ () -> {
+ TShowDatabaseResp resp1 = newLeaderClient.showDatabase(req);
+ if (MAX_STATE != resp1.getDatabaseInfoMap().size()) {
+ return false;
+ }
+ resp1
+ .getDatabaseInfoMap()
+ .keySet()
+ .forEach(databaseName -> expectedDatabases.remove(databaseName));
+ return expectedDatabases.isEmpty();
+ };
+ Awaitility.await().atMost(1, TimeUnit.MINUTES).until(finalCheck);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index ae88161b142..760de526241 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -45,6 +45,7 @@ import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.AuthUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.conf.SystemPropertiesUtils;
@@ -605,6 +606,12 @@ public class ConfigManager implements IManager {
}
}
+ @TestOnly
+ @Override
+ public TSStatus createManyDatabases() {
+ return getProcedureManager().createManyDatabases();
+ }
+
private List<TSeriesPartitionSlot> calculateRelatedSlot(PartialPath path,
PartialPath database) {
// The path contains `**`
if (path.getFullPath().contains(IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD)) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 56ed18cf566..e9e310bc342 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.CountDatabasePlan;
import
org.apache.iotdb.confignode.consensus.request.read.database.GetDatabasePlan;
@@ -314,6 +315,14 @@ public interface IManager {
*/
TSStatus deleteDatabases(List<String> deletedPaths);
+ /**
+ * Create many databases.
+ *
+ * @return status
+ */
+ @TestOnly
+ TSStatus createManyDatabases();
+
/**
* Get SchemaPartition.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 0fe46a5fc68..4a3a2547cea 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.commons.schema.view.viewExpression.ViewExpression;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.utils.StatusUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.auth.AuthorPlan;
@@ -50,6 +51,7 @@ import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -81,7 +83,6 @@ import
org.apache.iotdb.confignode.procedure.scheduler.SimpleProcedureScheduler;
import org.apache.iotdb.confignode.procedure.store.ConfigProcedureStore;
import org.apache.iotdb.confignode.procedure.store.IProcedureStore;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
import org.apache.iotdb.confignode.rpc.thrift.TAlterLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TAlterPipeReq;
@@ -172,6 +173,12 @@ public class ProcedureManager {
}
}
+ @TestOnly
+ public TSStatus createManyDatabases() {
+ this.executor.submitProcedure(new CreateManyDatabasesProcedure());
+ return StatusUtils.OK;
+ }
+
public TSStatus deleteDatabases(ArrayList<TDatabaseSchema>
deleteSgSchemaList) {
List<Long> procedureIds = new ArrayList<>();
for (TDatabaseSchema storageGroupSchema : deleteSgSchemaList) {
@@ -1023,10 +1030,6 @@ public class ProcedureManager {
return store;
}
- public void setStore(ProcedureStore store) {
- this.store = store;
- }
-
public ConfigNodeProcedureEnv getEnv() {
return env;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
index aebab36ddbd..fe181ef382e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java
@@ -25,7 +25,6 @@ import
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProce
import
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
-import org.apache.iotdb.confignode.procedure.store.ProcedureStore;
import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -44,6 +43,8 @@ public class ProcedureInfo {
private static final Logger LOG =
LoggerFactory.getLogger(ProcedureInfo.class);
+ private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+
private final ProcedureFactory procedureFactory =
ProcedureFactory.getInstance();
private final String procedureWalDir =
CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
@@ -51,7 +52,7 @@ public class ProcedureInfo {
public void load(List<Procedure> procedureList) {
try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
- s.filter(path ->
path.getFileName().toString().endsWith(ProcedureStore.PROCEDURE_WAL_SUFFIX))
+ s.filter(path ->
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
.sorted(
(p1, p2) ->
Long.compareUnsigned(
@@ -74,7 +75,7 @@ public class ProcedureInfo {
public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
Procedure procedure = updateProcedurePlan.getProcedure();
long procId = procedure.getProcId();
- Path path = Paths.get(procedureWalDir, procId +
ProcedureStore.PROCEDURE_WAL_SUFFIX);
+ Path path = Paths.get(procedureWalDir, procId + PROCEDURE_WAL_SUFFIX);
ProcedureWAL procedureWAL =
procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path,
procedureFactory));
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
index 0b90e33dad1..53bdc7774a3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/Procedure.java
@@ -52,6 +52,11 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
private static final Logger LOG = LoggerFactory.getLogger(Procedure.class);
public static final long NO_PROC_ID = -1;
public static final long NO_TIMEOUT = -1;
+ /**
+ * The isDeserialized of a newly created procedure is false. When a leader
switch or ConfigNode
+ * restart occurs during the execution of the procedure, isDeserialized
becomes true.
+ */
+ private boolean isDeserialized = false;
private long parentProcId = NO_PROC_ID;
private long rootProcId = NO_PROC_ID;
@@ -188,6 +193,8 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
public void deserialize(ByteBuffer byteBuffer) {
// procid
this.setProcId(byteBuffer.getLong());
+ // isDeserialized
+ this.setDeserialized(true);
// state
this.setState(ProcedureState.values()[byteBuffer.getInt()]);
// submit time
@@ -539,6 +546,10 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
return procId;
}
+ public boolean isDeserialized() {
+ return isDeserialized;
+ }
+
public boolean hasParent() {
return parentProcId != NO_PROC_ID;
}
@@ -564,6 +575,10 @@ public abstract class Procedure<Env> implements
Comparable<Procedure<Env>> {
this.procId = procId;
}
+ private void setDeserialized(boolean isDeserialized) {
+ this.isDeserialized = isDeserialized;
+ }
+
public void setProcRunnable() {
this.submittedTime = System.currentTimeMillis();
setState(ProcedureState.RUNNABLE);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
index ffc205dcbfc..4afb5b0eb32 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
@@ -88,6 +89,7 @@ public class ProcedureExecutor<Env> {
this.lastProcId.incrementAndGet();
}
+ @TestOnly
public ProcedureExecutor(final Env environment, final IProcedureStore store)
{
this(environment, store, new SimpleProcedureScheduler());
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
new file mode 100644
index 00000000000..9721e4afb88
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/CreateManyDatabasesProcedure.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
+import org.apache.iotdb.confignode.consensus.request.ConfigPhysicalPlanType;
+import
org.apache.iotdb.confignode.consensus.request.write.database.DatabaseSchemaPlan;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import
org.apache.iotdb.confignode.procedure.impl.statemachine.StateMachineProcedure;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+/**
+ * This procedure will create numerous databases (perhaps 100), during which
the confignode leader
+ * should be externally shutdown to test whether the procedure can be
correctly recovered after the
+ * leader change. The procedure will never finish until it's recovered from
another ConfigNode.
+ */
+@TestOnly
+public class CreateManyDatabasesProcedure
+ extends StateMachineProcedure<ConfigNodeProcedureEnv, Integer> {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(CreateManyDatabasesProcedure.class);
+ public static final int INITIAL_STATE = 0;
+ public static final int MAX_STATE = 100;
+ public static final String DATABASE_NAME_PREFIX = "root.test_";
+ public static final long SLEEP_FOREVER = Long.MAX_VALUE;
+ private boolean createFailedOnce = false;
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv
configNodeProcedureEnv, Integer state)
+ throws InterruptedException {
+ if (state < MAX_STATE) {
+ if (state == MAX_STATE - 1 && !isDeserialized()) {
+ Thread.sleep(SLEEP_FOREVER);
+ }
+ try {
+ createDatabase(configNodeProcedureEnv, state);
+ } catch (ProcedureException e) {
+ setFailure(e);
+ return Flow.NO_MORE_STATE;
+ }
+ setNextState(state + 1);
+ return Flow.HAS_MORE_STATE;
+ }
+ return Flow.NO_MORE_STATE;
+ }
+
+ private void createDatabase(ConfigNodeProcedureEnv env, int id) throws
ProcedureException {
+ String databaseName = DATABASE_NAME_PREFIX + id;
+ TDatabaseSchema databaseSchema = new TDatabaseSchema(databaseName);
+ TSStatus status =
+ env.getConfigManager()
+ .setDatabase(
+ new DatabaseSchemaPlan(ConfigPhysicalPlanType.CreateDatabase,
databaseSchema));
+ if (TSStatusCode.DATABASE_ALREADY_EXISTS.getStatusCode() ==
status.getCode()) {
+ // First mistakes are forgivable, but a second signals a problem.
+ if (!createFailedOnce) {
+ createFailedOnce = true;
+ } else {
+ throw new ProcedureException("createDatabase fail twice");
+ }
+ } else if (TSStatusCode.SUCCESS_STATUS.getStatusCode() !=
status.getCode()) {
+ throw new ProcedureException("Unexpected fail, tsStatus is " + status);
+ }
+ }
+
+ @Override
+ protected void rollbackState(ConfigNodeProcedureEnv configNodeProcedureEnv,
Integer integer)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ protected Integer getState(int stateId) {
+ return stateId;
+ }
+
+ @Override
+ protected int getStateId(Integer integer) {
+ return integer;
+ }
+
+ @Override
+ protected Integer getInitialState() {
+ return INITIAL_STATE;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+
stream.writeShort(ProcedureType.CREATE_MANY_DATABASES_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index a4257a133c4..0ad156e8253 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.procedure.store;
import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
@@ -165,6 +166,9 @@ public class ProcedureFactory implements IProcedureFactory {
case AUTH_OPERATE_PROCEDURE:
procedure = new AuthOperationProcedure();
break;
+ case CREATE_MANY_DATABASES_PROCEDURE:
+ procedure = new CreateManyDatabasesProcedure();
+ break;
default:
LOGGER.error("unknown Procedure type: " + typeCode);
throw new IOException("unknown Procedure type: " + typeCode);
@@ -234,8 +238,11 @@ public class ProcedureFactory implements IProcedureFactory
{
return ProcedureType.ALTER_LOGICAL_VIEW_PROCEDURE;
} else if (procedure instanceof AuthOperationProcedure) {
return ProcedureType.AUTH_OPERATE_PROCEDURE;
+ } else if (procedure instanceof CreateManyDatabasesProcedure) {
+ return ProcedureType.CREATE_MANY_DATABASES_PROCEDURE;
}
- return null;
+ throw new UnsupportedOperationException(
+ "Procedure type " + procedure.getClass() + " is not supported");
}
private static class ProcedureFactoryHolder {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
deleted file mode 100644
index 123199d8411..00000000000
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureStore.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.store;
-
-import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.utils.TestOnly;
-import org.apache.iotdb.confignode.procedure.Procedure;
-
-import org.apache.commons.io.FileUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
-
-public class ProcedureStore implements IProcedureStore {
-
- private static final Logger LOG =
LoggerFactory.getLogger(ProcedureStore.class);
- private String procedureWalDir =
- CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
- private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new
ConcurrentHashMap<>();
- public static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
- private final IProcedureFactory procedureFactory;
- private volatile boolean isRunning = false;
-
- public ProcedureStore(IProcedureFactory procedureFactory) {
- try {
- this.procedureFactory = procedureFactory;
- Files.createDirectories(Paths.get(procedureWalDir));
- } catch (IOException e) {
- throw new RuntimeException("Create procedure wal directory failed.", e);
- }
- }
-
- @TestOnly
- public ProcedureStore(String testWALDir, IProcedureFactory procedureFactory)
{
- this.procedureFactory = procedureFactory;
- try {
- Files.createDirectories(Paths.get(testWALDir));
- procedureWalDir = testWALDir;
- } catch (IOException e) {
- throw new RuntimeException("Create procedure wal directory failed.", e);
- }
- }
-
- public boolean isRunning() {
- return this.isRunning;
- }
-
- public void setRunning(boolean running) {
- isRunning = running;
- }
-
- /**
- * Load procedure wal files into memory.
- *
- * @param procedureList procedureList
- */
- public void load(List<Procedure> procedureList) {
- try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
- s.filter(path ->
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
- .sorted(
- (p1, p2) ->
- Long.compareUnsigned(
-
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
-
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
- .forEach(
- path -> {
- String fileName = path.getFileName().toString();
- long procId = Long.parseLong(fileName.split("\\.")[0]);
- ProcedureWAL procedureWAL =
- procWALMap.computeIfAbsent(
- procId, id -> new ProcedureWAL(path,
procedureFactory));
- procedureWAL.load(procedureList);
- });
- } catch (IOException e) {
- LOG.error("Load procedure wal failed.", e);
- }
- }
-
- /**
- * Update procedure, roughly delete and create a new wal file.
- *
- * @param procedure procedure
- */
- public void update(Procedure procedure) {
- if (!procedure.needPersistance()) {
- procWALMap.remove(procedure.getProcId());
- return;
- }
- long procId = procedure.getProcId();
- Path path = Paths.get(procedureWalDir, procId +
ProcedureStore.PROCEDURE_WAL_SUFFIX);
- ProcedureWAL procedureWAL =
- procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path,
procedureFactory));
- try {
- procedureWAL.save(procedure);
- } catch (IOException e) {
- LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId());
- }
- }
-
- /**
- * Batch update
- *
- * @param subprocs procedure array
- */
- public void update(Procedure[] subprocs) {
- for (Procedure subproc : subprocs) {
- update(subproc);
- }
- }
-
- /**
- * Delete procedure wal file
- *
- * @param procId procedure id
- */
- public void delete(long procId) {
- ProcedureWAL procedureWAL = procWALMap.get(procId);
- if (procedureWAL != null) {
- procedureWAL.delete();
- }
- procWALMap.remove(procId);
- }
-
- /**
- * Batch delete
- *
- * @param childProcIds procedure id array
- */
- public void delete(long[] childProcIds) {
- for (long childProcId : childProcIds) {
- delete(childProcId);
- }
- }
-
- /**
- * Batch delete by index
- *
- * @param batchIds batchIds
- * @param startIndex start index
- * @param batchCount delete procedure count
- */
- public void delete(long[] batchIds, int startIndex, int batchCount) {
- for (int i = startIndex; i < batchCount; i++) {
- delete(batchIds[i]);
- }
- }
-
- /** clean all the wal, used for unit test. */
- public void cleanup() {
- try {
- FileUtils.cleanDirectory(new File(procedureWalDir));
- } catch (IOException e) {
- LOG.error("Clean wal directory failed", e);
- }
- }
-
- public void stop() {
- isRunning = false;
- }
-
- @Override
- public void start() {
- if (!isRunning) {
- isRunning = true;
- }
- }
-}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index b539ffc46df..f7e7c68a584 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -30,12 +30,11 @@ public enum ProcedureType {
/** DataNode */
REMOVE_DATA_NODE_PROCEDURE((short) 100),
- /** StorageGroup */
+ /** StorageGroup and Region */
DELETE_STORAGE_GROUP_PROCEDURE((short) 200),
-
- /** Region */
REGION_MIGRATE_PROCEDURE((short) 201),
CREATE_REGION_GROUPS((short) 202),
+ CREATE_MANY_DATABASES_PROCEDURE((short) 203),
/** Timeseries */
DELETE_TIMESERIES_PROCEDURE((short) 300),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 9d35842edf2..95c82a1c001 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -480,6 +480,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return databaseSchemaResp.convertToRPCStorageGroupSchemaResp();
}
+ @Override
+ public TSStatus createManyDatabases() throws TException {
+ return configManager.createManyDatabases();
+ }
+
@Override
public TSchemaPartitionTableResp getSchemaPartitionTable(TSchemaPartitionReq
req) {
PathPatternTree patternTree =
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/STMProcedureTest.java
similarity index 97%
rename from
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
rename to
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/STMProcedureTest.java
index 7b387d3d438..3ccbb2e1516 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestSTMProcedure.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/STMProcedureTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
import java.util.concurrent.atomic.AtomicInteger;
-public class TestSTMProcedure extends TestProcedureBase {
+public class STMProcedureTest extends TestProcedureBase {
@Test
public void testSubmitProcedure() {
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
deleted file mode 100644
index 9e1139adc30..00000000000
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/store/TestProcedureStore.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iotdb.confignode.procedure.store;
-
-import org.apache.iotdb.confignode.procedure.Procedure;
-import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
-import org.apache.iotdb.confignode.procedure.TestProcedureBase;
-import org.apache.iotdb.confignode.procedure.entity.IncProcedure;
-import org.apache.iotdb.confignode.procedure.entity.StuckSTMProcedure;
-import org.apache.iotdb.confignode.procedure.entity.TestProcedureFactory;
-import org.apache.iotdb.confignode.procedure.env.TestProcEnv;
-import org.apache.iotdb.confignode.procedure.state.ProcedureState;
-import org.apache.iotdb.confignode.procedure.util.ProcedureTestUtil;
-
-import org.apache.commons.io.FileUtils;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class TestProcedureStore extends TestProcedureBase {
-
- private static final String TEST_DIR = "./target/testWAL/";
- private static final int WORK_THREAD = 2;
- private IProcedureFactory factory = new TestProcedureFactory();
-
- @Override
- protected void initExecutor() {
- this.env = new TestProcEnv();
- this.procStore = new ProcedureStore(TEST_DIR, factory);
- this.procExecutor = new ProcedureExecutor<>(env, procStore);
- this.env.setScheduler(this.procExecutor.getScheduler());
- this.procExecutor.init(WORK_THREAD);
- }
-
- @Test
- public void testUpdate() {
- ProcedureStore procedureStore = new ProcedureStore(TEST_DIR, factory);
- IncProcedure incProcedure = new IncProcedure();
- procedureStore.update(incProcedure);
- List<Procedure> procedureList = new ArrayList<>();
- procedureStore.load(procedureList);
- assertProc(
- incProcedure,
- procedureList.get(0).getClass(),
- procedureList.get(0).getProcId(),
- procedureList.get(0).getState());
- this.procStore.cleanup();
- try {
- FileUtils.cleanDirectory(new File(TEST_DIR));
- } catch (IOException e) {
- System.out.println("clean dir failed." + e);
- }
- }
-
- @Test
- public void testChildProcedureLoad() {
- int childCount = 10;
- StuckSTMProcedure STMProcedure = new StuckSTMProcedure(childCount);
- long rootId = procExecutor.submitProcedure(STMProcedure);
- ProcedureTestUtil.sleepWithoutInterrupt(50);
- // stop service
- ProcedureTestUtil.stopService(procExecutor, procExecutor.getScheduler(),
procStore);
- ConcurrentHashMap<Long, Procedure> procedures =
procExecutor.getProcedures();
- ProcedureStore procedureStore = new ProcedureStore(TEST_DIR, new
TestProcedureFactory());
- List<Procedure> procedureList = new ArrayList<>();
- procedureStore.load(procedureList);
- Assert.assertEquals(childCount + 1, procedureList.size());
- for (int i = 0; i < procedureList.size(); i++) {
- Procedure procedure = procedureList.get(i);
- assertProc(
- procedure,
- procedures.get(procedure.getProcId()).getClass(),
- i + 1,
- procedures.get(procedure.getProcId()).getState());
- }
- // restart service
- initExecutor();
- this.procStore.start();
- this.procExecutor.startWorkers();
-
- ProcedureTestUtil.waitForProcedure(procExecutor, rootId);
- Assert.assertEquals(
- procExecutor.getResultOrProcedure(rootId).getState(),
ProcedureState.SUCCESS);
- }
-
- private void assertProc(Procedure proc, Class clazz, long procId,
ProcedureState state) {
- Assert.assertEquals(clazz, proc.getClass());
- Assert.assertEquals(procId, proc.getProcId());
- Assert.assertEquals(state, proc.getState());
- }
-}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 6cb1373fd17..1387e5f338f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -471,6 +471,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
() -> client.getMatchedDatabaseSchemas(req), resp ->
!updateConfigNodeLeader(resp.status));
}
+ @Override
+ public TSStatus createManyDatabases() throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.createManyDatabases(), status ->
!updateConfigNodeLeader(status));
+ }
+
@Override
public TSStatus setTTL(TSetTTLReq setTTLReq) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
index 05ade0b21c8..089d906e91c 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/TestOnly.java
@@ -25,10 +25,10 @@ import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
-@Target({ElementType.METHOD, ElementType.CONSTRUCTOR})
-@Retention(RetentionPolicy.SOURCE)
/**
- * TestOnly implies that the method should only be used in the tests,
otherwise its functionality is
- * not guaranteed and may interfere with the normal code.
+ * TestOnly implies that the class or method should only be used in the tests,
otherwise its
+ * functionality is not guaranteed and may interfere with the normal code.
*/
+@Target({ElementType.METHOD, ElementType.CONSTRUCTOR, ElementType.TYPE})
+@Retention(RetentionPolicy.SOURCE)
public @interface TestOnly {}
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index a04cfe588a5..989255c0cc8 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -945,6 +945,9 @@ service IConfigNodeRPCService {
/** Get the matched Databases' TDatabaseSchema */
TDatabaseSchemaResp getMatchedDatabaseSchemas(TGetDatabaseReq req)
+ /** Test only */
+ common.TSStatus createManyDatabases()
+
// ======================================================
// SchemaPartition
// ======================================================