This is an automated email from the ASF dual-hosted git repository.
CRZbulabula pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b7eb1937b6d Manually trigger repair data partition (#17530)
b7eb1937b6d is described below
commit b7eb1937b6d17e0d5bc514add29b54a33cc669bc
Author: libo <[email protected]>
AuthorDate: Wed Apr 29 19:02:50 2026 +0800
Manually trigger repair data partition (#17530)
---
...ataPartitionTableIntegrityCheckProcedureIT.java | 131 +++++++++++++++++++++
.../org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4 | 6 +
.../iotdb/confignode/conf/ConfigNodeConfig.java | 11 --
.../confignode/conf/ConfigNodeDescriptor.java | 17 ---
.../iotdb/confignode/manager/ConfigManager.java | 10 ++
.../apache/iotdb/confignode/manager/IManager.java | 2 +
.../iotdb/confignode/manager/ProcedureManager.java | 11 --
.../manager/partition/PartitionManager.java | 27 +++++
.../DataPartitionTableIntegrityCheckProcedure.java | 9 ++
.../thrift/ConfigNodeRPCServiceProcessor.java | 5 +
.../iotdb/db/protocol/client/ConfigNodeClient.java | 6 +
.../execution/config/TreeConfigTaskVisitor.java | 8 ++
.../config/executor/ClusterConfigTaskExecutor.java | 21 ++++
.../config/executor/IConfigTaskExecutor.java | 2 +
.../config/sys/RepairDataPartitionTableTask.java | 37 ++++++
.../db/queryengine/plan/parser/ASTVisitor.java | 9 ++
.../security/TreeAccessCheckVisitor.java | 59 +++++++++-
.../queryengine/plan/statement/StatementType.java | 1 +
.../plan/statement/StatementVisitor.java | 6 +
.../statement/sys/RepairDataPartitionTable.java | 52 ++++++++
.../conf/iotdb-system.properties.template | 6 -
.../src/main/thrift/confignode.thrift | 2 +
22 files changed, 391 insertions(+), 47 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java
new file mode 100644
index 00000000000..e93856cacd1
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/partition/DataPartitionTableIntegrityCheckProcedureIT.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.it.partition;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class, ClusterIT.class})
+public class DataPartitionTableIntegrityCheckProcedureIT {
+ private static final Logger LOGGER =
+
LoggerFactory.getLogger(DataPartitionTableIntegrityCheckProcedureIT.class);
+
+ @Before
+ public void setUp() {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setConfigNodeConsensusProtocolClass(RATIS_CONSENSUS)
+ .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setDataRegionConsensusProtocolClass(RATIS_CONSENSUS)
+ .setDataReplicationFactor(1);
+ EnvFactory.getEnv().initClusterEnvironment(1, 1);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testConcurrentSubmitDataPartitionTableIntegrityCheckProcedure()
+ throws InterruptedException {
+ final int threadCount = 10;
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch finishLatch = new CountDownLatch(threadCount);
+ final ExecutorService executor = Executors.newFixedThreadPool(threadCount);
+
+ final AtomicInteger successCount = new AtomicInteger(0);
+ final AtomicInteger failCount = new AtomicInteger(0);
+ final List<String> failureMessages = Collections.synchronizedList(new
ArrayList<>());
+
+ // Concurrently submit the DataPartitionTableIntegrityCheckProcedure
+ for (int i = 0; i < threadCount; i++) {
+ final int threadId = i;
+ executor.submit(
+ () -> {
+ try {
+ startLatch.await();
+
+ try (final Connection connection =
EnvFactory.getEnv().getConnection();
+ final Statement stmt = connection.createStatement()) {
+ stmt.execute("REPAIR DATA PARTITION TABLE");
+ successCount.incrementAndGet();
+ LOGGER.info("Thread {} submitted integrity check
successfully", threadId);
+ }
+ } catch (final SQLException e) {
+ failCount.incrementAndGet();
+ failureMessages.add("Thread " + threadId + " failed: " +
e.getMessage());
+ LOGGER.info(
+ "Thread {} failed to submit integrity check: {}", threadId,
e.getMessage());
+ } catch (final Exception e) {
+ failCount.incrementAndGet();
+ failureMessages.add("Thread " + threadId + " failed
unexpectedly: " + e.getMessage());
+ LOGGER.error("Thread {} unexpected error: {}", threadId,
e.getMessage(), e);
+ } finally {
+ finishLatch.countDown();
+ }
+ });
+ }
+
+ startLatch.countDown();
+
+ final boolean completed = finishLatch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("Not all threads completed within timeout", completed);
+
+ executor.shutdown();
+ Assert.assertTrue(
+ "Executor did not terminate", executor.awaitTermination(10,
TimeUnit.SECONDS));
+
+ LOGGER.info("Success count: {}, Fail count: {}", successCount.get(),
failCount.get());
+ LOGGER.info("Failure messages: {}", failureMessages);
+
+ Assert.assertEquals(
+ "Only one procedure should be submitted successfully", 1,
successCount.get());
+ Assert.assertEquals(
+ "The other concurrent submissions should be rejected", threadCount -
1, failCount.get());
+ }
+}
diff --git
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
index 31d1007867a..c6271c134cb 100644
---
a/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
+++
b/iotdb-core/antlr/src/main/antlr4/org/apache/iotdb/db/qp/sql/IoTDBSqlParser.g4
@@ -94,6 +94,7 @@ utilityStatement
| showQueries | showDiskUsage | showCurrentTimestamp | killQuery |
grantWatermarkEmbedding
| revokeWatermarkEmbedding | loadConfiguration | loadTimeseries | loadFile
| removeFile | unloadFile | setSqlDialect | showCurrentSqlDialect |
showCurrentUser
+ | repairDataPartitionTable
;
/**
@@ -1238,6 +1239,11 @@ stopRepairData
: STOP REPAIR DATA (ON (LOCAL | CLUSTER))?
;
+// Repair Data Partition Table
+repairDataPartitionTable
+ : REPAIR DATA PARTITION TABLE
+ ;
+
// Explain
explain
: EXPLAIN (ANALYZE VERBOSE?)? selectStatement?
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
index 3f92340d431..f305b06398d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeConfig.java
@@ -320,8 +320,6 @@ public class ConfigNodeConfig {
private long forceWalPeriodForConfigNodeSimpleInMs = 100;
- private long partitionTableRecoverWaitAllDnUpTimeoutInMs = 60000;
-
public ConfigNodeConfig() {
// empty constructor
}
@@ -1290,13 +1288,4 @@ public class ConfigNodeConfig {
public void setFailureDetectorPhiAcceptablePauseInMs(long
failureDetectorPhiAcceptablePauseInMs) {
this.failureDetectorPhiAcceptablePauseInMs =
failureDetectorPhiAcceptablePauseInMs;
}
-
- public long getPartitionTableRecoverWaitAllDnUpTimeoutInMs() {
- return partitionTableRecoverWaitAllDnUpTimeoutInMs;
- }
-
- public void setPartitionTableRecoverWaitAllDnUpTimeoutInMs(
- long partitionTableRecoverWaitAllDnUpTimeoutInMs) {
- this.partitionTableRecoverWaitAllDnUpTimeoutInMs =
partitionTableRecoverWaitAllDnUpTimeoutInMs;
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
index dd32415ebe0..77790dae1a9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -322,23 +322,6 @@ public class ConfigNodeDescriptor {
"failure_detector_phi_acceptable_pause_in_ms",
String.valueOf(conf.getFailureDetectorPhiAcceptablePauseInMs()))));
- long partitionTableRecoverWaitAllDnUpTimeoutInMs =
- Long.parseLong(
- properties.getProperty(
- "partition_table_recover_wait_all_dn_up_timeout_ms",
-
String.valueOf(conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs())));
- if (partitionTableRecoverWaitAllDnUpTimeoutInMs <= 0) {
- LOGGER.warn(
- "partition_table_recover_wait_all_dn_up_timeout_ms should be greater
than 0, "
- + "but current value is {}, ignore that and use the default
value {}",
- partitionTableRecoverWaitAllDnUpTimeoutInMs,
- conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
- partitionTableRecoverWaitAllDnUpTimeoutInMs =
- conf.getPartitionTableRecoverWaitAllDnUpTimeoutInMs();
- }
- conf.setPartitionTableRecoverWaitAllDnUpTimeoutInMs(
- partitionTableRecoverWaitAllDnUpTimeoutInMs);
-
String leaderDistributionPolicy =
properties.getProperty("leader_distribution_policy",
conf.getLeaderDistributionPolicy());
if (AbstractLeaderBalancer.GREEDY_POLICY.equals(leaderDistributionPolicy)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index b13acd8d958..327f842e966 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -1157,6 +1157,16 @@ public class ConfigManager implements IManager {
return resp;
}
+ @Override
+ public TSStatus dataPartitionTableIntegrityCheck() {
+ TSStatus status = confirmLeader();
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ return status;
+ }
+
+ return partitionManager.dataPartitionTableIntegrityCheck();
+ }
+
private void printNewCreatedDataPartition(
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan,
TDataPartitionTableResp resp) {
final String lineSeparator = System.lineSeparator();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index 50601dff9ac..4dce39a9e98 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -477,6 +477,8 @@ public interface IManager {
TDataPartitionTableResp getOrCreateDataPartition(
GetOrCreateDataPartitionPlan getOrCreateDataPartitionPlan);
+ TSStatus dataPartitionTableIntegrityCheck();
+
/**
* Get AuditLogger.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index a9457f23964..3de0f4247d8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -69,7 +69,6 @@ import
org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveConfigNodeProcedure;
import
org.apache.iotdb.confignode.procedure.impl.node.RemoveDataNodesProcedure;
-import
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.plugin.CreatePipePluginProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.plugin.DropPipePluginProcedure;
import
org.apache.iotdb.confignode.procedure.impl.pipe.runtime.PipeHandleLeaderChangeProcedure;
@@ -1376,16 +1375,6 @@ public class ProcedureManager {
}
}
- /** Used to repair the lost data partition table */
- public TSStatus dataPartitionTableIntegrityCheck() {
- DataPartitionTableIntegrityCheckProcedure procedure;
- synchronized (this) {
- procedure = new DataPartitionTableIntegrityCheckProcedure();
- executor.submitProcedure(procedure);
- }
- return waitingProcedureFinished(procedure, 86400000);
- }
-
/**
* Generate {@link CreateTriggerProcedure} and wait until it finished.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
index 607d670006c..da7a19cfdb5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/partition/PartitionManager.java
@@ -83,6 +83,7 @@ import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionCreate
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionDeleteTask;
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainTask;
import
org.apache.iotdb.confignode.persistence.partition.maintainer.RegionMaintainType;
+import
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TCountTimeSlotListReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionGroupsByTimeReq;
import org.apache.iotdb.confignode.rpc.thrift.TGetRegionIdReq;
@@ -155,6 +156,9 @@ public class PartitionManager {
private final ScheduledExecutorService regionMaintainer;
private Future<?> currentRegionMaintainerFuture;
+ private final AtomicBoolean dataPartitionTableIntegrityCheckProcedureRunning
=
+ new AtomicBoolean(false);
+
public PartitionManager(IManager configManager, PartitionInfo partitionInfo)
{
this.configManager = configManager;
this.partitionInfo = partitionInfo;
@@ -514,6 +518,29 @@ public class PartitionManager {
return resp;
}
+ /** Used to repair the lost data partition table */
+ public TSStatus dataPartitionTableIntegrityCheck() {
+ if (configManager
+ .getProcedureManager()
+
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)
+ ||
!dataPartitionTableIntegrityCheckProcedureRunning.compareAndSet(false, true)) {
+ return RpcUtils.getStatus(
+ TSStatusCode.OVERLAP_WITH_EXISTING_TASK,
+ "DataPartitionTableIntegrityCheckProcedure is already submitted.");
+ }
+
+ synchronized (this) {
+ DataPartitionTableIntegrityCheckProcedure procedure =
+ new DataPartitionTableIntegrityCheckProcedure();
+ getProcedureManager().getExecutor().submitProcedure(procedure);
+ }
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
+ public void markDataPartitionTableIntegrityCheckProcedureFinished() {
+ dataPartitionTableIntegrityCheckProcedureRunning.set(false);
+ }
+
private TSStatus consensusWritePartitionResult(ConfigPhysicalPlan plan) {
TSStatus status = getConsensusManager().confirmLeader();
if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index 4f2c6933fd8..f3d539576d4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -131,6 +131,15 @@ public class DataPartitionTableIntegrityCheckProcedure
super();
}
+ @Override
+ protected void updateMetricsOnFinish(
+ final ConfigNodeProcedureEnv env, final long runtime, final boolean
success) {
+ super.updateMetricsOnFinish(env, runtime, success);
+ env.getConfigManager()
+ .getPartitionManager()
+ .markDataPartitionTableIntegrityCheckProcedureFinished();
+ }
+
@Override
protected Flow executeFromState(
final ConfigNodeProcedureEnv env, final
DataPartitionTableIntegrityCheckProcedureState state)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 52f330bcdce..4dac0ea8e34 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -626,6 +626,11 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return configManager.getOrCreateDataPartition(getOrCreateDataPartitionReq);
}
+ @Override
+ public TSStatus dataPartitionTableIntegrityCheck() {
+ return configManager.dataPartitionTableIntegrityCheck();
+ }
+
@Override
public TSStatus operatePermission(final TAuthorizerReq req) {
ConfigPhysicalPlanType configPhysicalPlanType =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index e0001c448bb..c552bafc14a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -702,6 +702,12 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
resp -> !updateConfigNodeLeader(resp.status));
}
+ @Override
+ public TSStatus dataPartitionTableIntegrityCheck() throws TException {
+ return executeRemoteCallWithRetry(
+ () -> client.dataPartitionTableIntegrityCheck(), status ->
!updateConfigNodeLeader(status));
+ }
+
@Override
public TSStatus operatePermission(TAuthorizerReq req) throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
index 7dd36850bb1..de090b31f48 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/TreeConfigTaskVisitor.java
@@ -105,6 +105,7 @@ import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.FlushTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.KillQueryTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.LoadConfigurationTask;
import org.apache.iotdb.db.queryengine.plan.execution.config.sys.MergeTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.RepairDataPartitionTableTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetConfigurationTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.SetSystemStatusTask;
import
org.apache.iotdb.db.queryengine.plan.execution.config.sys.ShowConfigurationTask;
@@ -213,6 +214,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
@@ -384,6 +386,12 @@ public class TreeConfigTaskVisitor extends
StatementVisitor<IConfigTask, MPPQuer
return new StartRepairDataTask(startRepairDataStatement);
}
+ @Override
+ public IConfigTask visitRepairDataPartitionTable(
+ RepairDataPartitionTable repairDataPartitionTable, MPPQueryContext
context) {
+ return new RepairDataPartitionTableTask();
+ }
+
@Override
public IConfigTask visitStopRepairData(
StopRepairDataStatement stopRepairDataStatement, MPPQueryContext
context) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
index 87c3c1428d8..659b24b2767 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/ClusterConfigTaskExecutor.java
@@ -1458,6 +1458,27 @@ public class ClusterConfigTaskExecutor implements
IConfigTaskExecutor {
return future;
}
+ @Override
+ public SettableFuture<ConfigTaskResult> repairDataPartitionTable() {
+ SettableFuture<ConfigTaskResult> future = SettableFuture.create();
+ TSStatus tsStatus = new TSStatus();
+
+ try (ConfigNodeClient client =
+
CONFIG_NODE_CLIENT_MANAGER.borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ // Send request to ConfigNode to trigger
DataPartitionTableIntegrityCheckProcedure
+ tsStatus = client.dataPartitionTableIntegrityCheck();
+ } catch (ClientManagerException | TException e) {
+ future.setException(e);
+ }
+
+ if (tsStatus.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ future.set(new ConfigTaskResult(TSStatusCode.SUCCESS_STATUS));
+ } else {
+ future.setException(new IoTDBException(tsStatus));
+ }
+ return future;
+ }
+
@Override
public SettableFuture<ConfigTaskResult> loadConfiguration(boolean onCluster)
{
SettableFuture<ConfigTaskResult> future = SettableFuture.create();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
index 32f63d61a27..b4b928ba0b6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/executor/IConfigTaskExecutor.java
@@ -156,6 +156,8 @@ public interface IConfigTaskExecutor {
SettableFuture<ConfigTaskResult> stopRepairData(boolean onCluster);
+ SettableFuture<ConfigTaskResult> repairDataPartitionTable();
+
SettableFuture<ConfigTaskResult> flush(TFlushReq tFlushReq, boolean
onCluster);
SettableFuture<ConfigTaskResult> clearCache(boolean onCluster,
Set<CacheClearOptions> options);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/RepairDataPartitionTableTask.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/RepairDataPartitionTableTask.java
new file mode 100644
index 00000000000..f3675e9a0d9
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/config/sys/RepairDataPartitionTableTask.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.execution.config.sys;
+
+import org.apache.iotdb.db.queryengine.plan.execution.config.ConfigTaskResult;
+import org.apache.iotdb.db.queryengine.plan.execution.config.IConfigTask;
+import
org.apache.iotdb.db.queryengine.plan.execution.config.executor.IConfigTaskExecutor;
+
+import com.google.common.util.concurrent.ListenableFuture;
+
+public class RepairDataPartitionTableTask implements IConfigTask {
+
+ public RepairDataPartitionTableTask() {}
+
+ @Override
+ public ListenableFuture<ConfigTaskResult> execute(IConfigTaskExecutor
configTaskExecutor)
+ throws InterruptedException {
+ return configTaskExecutor.repairDataPartitionTable();
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
index 39c52a5c498..28e935d5047 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/parser/ASTVisitor.java
@@ -241,6 +241,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
@@ -3756,6 +3757,14 @@ public class ASTVisitor extends
IoTDBSqlParserBaseVisitor<Statement> {
return startRepairDataStatement;
}
+ // Repair Data Partition Table
+
+ @Override
+ public Statement visitRepairDataPartitionTable(
+ IoTDBSqlParser.RepairDataPartitionTableContext ctx) {
+ return new RepairDataPartitionTable();
+ }
+
// Stop Repair Data
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
index 95bb70872ff..7b4655c79a8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/security/TreeAccessCheckVisitor.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.relational.security;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.audit.AuditEventType;
import org.apache.iotdb.commons.audit.AuditLogOperation;
import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.auth.AuthException;
@@ -147,6 +148,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
@@ -188,8 +190,8 @@ public class TreeAccessCheckVisitor extends
StatementVisitor<TSStatus, TreeAcces
private static final DNAuditLogger AUDIT_LOGGER =
DNAuditLogger.getInstance();
- private static final String OBJECT_AUTHENTICATION_AUDIT_STR =
- "User %s (ID=%d) requests authority on object %s with result %s";
+ private static final String OPERATION_AUDIT_STR =
+ "User %s (ID=%d) requests authority on the %s operation";
@Override
public TSStatus visitNode(StatementNode node, TreeAccessCheckContext
context) {
@@ -1681,6 +1683,15 @@ public class TreeAccessCheckVisitor extends
StatementVisitor<TSStatus, TreeAcces
context.setAuditLogOperation(AuditLogOperation.CONTROL),
PrivilegeType.SYSTEM, () -> "");
}
+ @Override
+ public TSStatus visitRepairDataPartitionTable(
+ RepairDataPartitionTable repairDataPartitionTable,
TreeAccessCheckContext context) {
+ return checkGlobalAuth(
+ context.setAuditLogOperation(AuditLogOperation.CONTROL),
+ PrivilegeType.SYSTEM,
+ AuditEventType.INTEGRITY_CHECK);
+ }
+
@Override
public TSStatus visitStopRepairData(
StopRepairDataStatement stopRepairDataStatement, TreeAccessCheckContext
context) {
@@ -2017,6 +2028,7 @@ public class TreeAccessCheckVisitor extends
StatementVisitor<TSStatus, TreeAcces
return checkGlobalAuth(context, PrivilegeType.SYSTEM, auditObject);
}
+ /** The method will record object authentication audit log for auditObject*
*/
protected TSStatus checkGlobalAuth(
IAuditEntity context, PrivilegeType requiredPrivilege, Supplier<String>
auditObject) {
if (checkHasGlobalAuth(context, requiredPrivilege, auditObject)) {
@@ -2029,11 +2041,54 @@ public class TreeAccessCheckVisitor extends
StatementVisitor<TSStatus, TreeAcces
return result;
}
+ protected TSStatus checkGlobalAuth(
+ IAuditEntity context, PrivilegeType requiredPrivilege, AuditEventType
auditEventType) {
+ if (checkHasGlobalAuth(context, requiredPrivilege, auditEventType, false))
{
+ return SUCCEED;
+ }
+ TSStatus result = AuthorityChecker.getTSStatus(false, requiredPrivilege);
+ IAuditEntity auditEntity =
+ context.setResult(result.getCode() ==
TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ AUDIT_LOGGER.log(
+ auditEntity.setAuditEventType(auditEventType),
+ () ->
+ String.format(
+ OPERATION_AUDIT_STR,
+ auditEntity.getUsername(),
+ auditEntity.getUserId(),
+ auditEventType));
+ return result;
+ }
+
protected boolean checkHasGlobalAuth(
IAuditEntity context, PrivilegeType requiredPrivilege, Supplier<String>
auditObject) {
return checkHasGlobalAuth(context, requiredPrivilege, auditObject, false);
}
+ protected boolean checkHasGlobalAuth(
+ IAuditEntity context,
+ PrivilegeType requiredPrivilege,
+ AuditEventType auditEventType,
+ boolean checkGrantOption) {
+ boolean result =
+ AuthorityChecker.SUPER_USER.equals(context.getUsername())
+ || (checkGrantOption
+ ? AuthorityChecker.checkSystemPermissionGrantOption(
+ context.getUsername(), requiredPrivilege)
+ :
AuthorityChecker.checkSystemPermission(context.getUsername(),
requiredPrivilege));
+
+ IAuditEntity auditEntity = context.setResult(result);
+ AUDIT_LOGGER.log(
+ auditEntity.setAuditEventType(auditEventType),
+ () ->
+ String.format(
+ OPERATION_AUDIT_STR,
+ auditEntity.getUsername(),
+ auditEntity.getUserId(),
+ auditEventType));
+ return result;
+ }
+
protected boolean checkHasGlobalAuth(
IAuditEntity context,
PrivilegeType requiredPrivilege,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
index 19e597ef031..b40c6444816 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementType.java
@@ -181,6 +181,7 @@ public enum StatementType {
PIPE_ENRICHED,
START_REPAIR_DATA,
STOP_REPAIR_DATA,
+ REPAIR_DATA_PARTITION_TABLE,
CREATE_TOPIC,
DROP_TOPIC,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
index 3617c4bae8d..847e850c521 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/StatementVisitor.java
@@ -138,6 +138,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.sys.FlushStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.KillQueryStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.LoadConfigurationStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.MergeStatement;
+import
org.apache.iotdb.db.queryengine.plan.statement.sys.RepairDataPartitionTable;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetConfigurationStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSqlDialectStatement;
import
org.apache.iotdb.db.queryengine.plan.statement.sys.SetSystemStatusStatement;
@@ -520,6 +521,11 @@ public abstract class StatementVisitor<R, C> {
return visitStatement(stopRepairDataStatement, context);
}
+ public R visitRepairDataPartitionTable(
+ RepairDataPartitionTable repairDataPartitionTable, C context) {
+ return visitStatement(repairDataPartitionTable, context);
+ }
+
public R visitLoadConfiguration(
LoadConfigurationStatement loadConfigurationStatement, C context) {
return visitStatement(loadConfigurationStatement, context);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/RepairDataPartitionTable.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/RepairDataPartitionTable.java
new file mode 100644
index 00000000000..b377952ac5b
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/statement/sys/RepairDataPartitionTable.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.statement.sys;
+
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.statement.IConfigStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementType;
+import org.apache.iotdb.db.queryengine.plan.statement.StatementVisitor;
+
+import java.util.Collections;
+import java.util.List;
+
+public class RepairDataPartitionTable extends Statement implements
IConfigStatement {
+
+ public RepairDataPartitionTable() {
+ this.statementType = StatementType.REPAIR_DATA_PARTITION_TABLE;
+ }
+
+ @Override
+ public List<PartialPath> getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public QueryType getQueryType() {
+ return QueryType.OTHER;
+ }
+
+ @Override
+ public <R, C> R accept(StatementVisitor<R, C> visitor, C context) {
+ return visitor.visitRepairDataPartitionTable(this, context);
+ }
+}
diff --git
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
index 378a6226cbf..87be30f4520 100644
---
a/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
+++
b/iotdb-core/node-commons/src/assembly/resources/conf/iotdb-system.properties.template
@@ -739,12 +739,6 @@ partition_table_recover_worker_num=10
# Datatype: Integer
partition_table_recover_max_read_megabytes_per_second=10
-# Purpose: for data partition repair
-# Set a timeout to wait for all datanodes complete startup, the unit is ms
-# effectiveMode: restart
-# Datatype: Integer
-partition_table_recover_wait_all_dn_up_timeout_ms=60000
-
####################
### Memory Control Configuration
####################
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index b1af203f4a1..22529ffbb73 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -1498,6 +1498,8 @@ service IConfigNodeRPCService {
*/
TDataPartitionTableResp getOrCreateDataPartitionTable(TDataPartitionReq req)
+ common.TSStatus dataPartitionTableIntegrityCheck()
+
// ======================================================
// Authorize
// ======================================================