This is an automated email from the ASF dual-hosted git repository.
yongzao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b4ce8dab040 Fix can not use currentGeneratorFuture and
currentGenerator to get current progress if DN is restarted before the data
partition generation complete (#17491)
b4ce8dab040 is described below
commit b4ce8dab040c39d68f877348669ac1bf78b506ff
Author: libo <[email protected]>
AuthorDate: Thu Apr 16 14:15:38 2026 +0800
Fix can not use currentGeneratorFuture and currentGenerator to get current
progress if DN is restarted before the data partition generation complete
(#17491)
---
.../client/sync/SyncDataNodeClientPool.java | 3 +-
.../iotdb/confignode/manager/ConfigManager.java | 6 +-
.../iotdb/confignode/manager/ProcedureManager.java | 20 +++++++
.../manager/consensus/ConsensusManager.java | 19 +++++++
.../DataPartitionTableIntegrityCheckProcedure.java | 4 +-
.../iotdb/confignode/service/ConfigNode.java | 66 +++++++++++++---------
.../impl/DataNodeInternalRPCServiceImpl.java | 14 +++--
.../src/main/thrift/datanode.thrift | 2 +-
8 files changed, 95 insertions(+), 39 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
index 9f5729ef06d..b9cf775459c 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/sync/SyncDataNodeClientPool.java
@@ -148,7 +148,8 @@ public class SyncDataNodeClientPool {
(req, client) ->
client.generateDataPartitionTable((TGenerateDataPartitionTableReq) req));
actionMapBuilder.put(
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
- (req, client) -> client.generateDataPartitionTableHeartbeat());
+ (req, client) ->
+
client.generateDataPartitionTableHeartbeat((TGenerateDataPartitionTableReq)
req));
actionMap = actionMapBuilder.build();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index f455edb26b8..182dc2f9fb2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -443,15 +443,15 @@ public class ConfigManager implements IManager {
}
public void close() throws IOException {
- if (consensusManager.get() != null) {
- consensusManager.get().close();
- }
if (partitionManager != null) {
partitionManager.getRegionMaintainer().shutdown();
}
if (procedureManager != null) {
procedureManager.stopExecutor();
}
+ if (consensusManager.get() != null) {
+ consensusManager.get().close();
+ }
}
@Override
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 4d420cf4f3c..2c5a77303d9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -62,6 +62,7 @@ import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.env.RemoveDataNodeHandler;
+import org.apache.iotdb.confignode.procedure.impl.StateMachineProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.RemoveAINodeProcedure;
@@ -2329,6 +2330,25 @@ public class ProcedureManager {
return new Pair<>(-1L, false);
}
+ public boolean isExistUnfinishedProcedure(
+ Class<? extends StateMachineProcedure<?, ?>> procedureClass) {
+ if (procedureClass == null) {
+ return false;
+ }
+
+ for (Procedure<ConfigNodeProcedureEnv> procedure :
getExecutor().getProcedures().values()) {
+ if (!procedure.isFinished() && procedureClass.isInstance(procedure)) {
+ LOGGER.info(
+ "[{}] procedure details are {}",
+ procedureClass.getSimpleName(),
+ procedure.toStringDetails());
+ return true;
+ }
+ }
+
+ return false;
+ }
+
// ======================================================
/*
GET-SET Region
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
index a4436a94fa2..51773d83923 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java
@@ -388,6 +388,25 @@ public class ConsensusManager {
return null;
}
+ public TConfigNodeLocation getNotNullLeaderLocation() {
+ Peer leaderPeer = getLeaderPeer();
+
+ while (leaderPeer == null) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ignored) {
+
+ }
+ leaderPeer = getLeaderPeer();
+ }
+
+ Peer finalLeaderPeer = leaderPeer;
+ return getNodeManager().getRegisteredConfigNodes().stream()
+ .filter(leader -> leader.getConfigNodeId() ==
finalLeaderPeer.getNodeId())
+ .findFirst()
+ .orElse(null);
+ }
+
/**
* @return true if ConfigNode-leader is elected, false otherwise.
*/
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
index c95c1ec9072..4f2c6933fd8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/partition/DataPartitionTableIntegrityCheckProcedure.java
@@ -521,11 +521,13 @@ public class DataPartitionTableIntegrityCheckProcedure
if (!dataPartitionTables.containsKey(dataNodeId)) {
try {
+ TGenerateDataPartitionTableReq req = new
TGenerateDataPartitionTableReq();
+ req.setDatabases(databasesWithLostDataPartition);
Object response =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithGivenRetry(
dataNode.getLocation().getInternalEndPoint(),
- null,
+ req,
CnToDnSyncRequestType.GENERATE_DATA_PARTITION_TABLE_HEART_BEAT,
MAX_RETRY_COUNT);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 3a6c93b12ec..5caac7127f7 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -55,6 +55,7 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.consensus.ConsensusManager;
import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
import org.apache.iotdb.confignode.manager.pipe.metric.PipeConfigNodeMetrics;
+import
org.apache.iotdb.confignode.procedure.impl.partition.DataPartitionTableIntegrityCheckProcedure;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
@@ -195,6 +196,9 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
configManager.initConsensusManager();
upgrade();
TConfigNodeLocation leaderNodeLocation = waitForLeaderElected();
+ if (leaderNodeLocation == null) {
+ leaderNodeLocation =
configManager.getConsensusManager().getNotNullLeaderLocation();
+ }
setUpMetricService();
// Notice: We always set up Seed-ConfigNode's RPC service lastly to
ensure
// that the external service is not provided until ConfigNode is fully
available
@@ -225,36 +229,42 @@ public class ConfigNode extends ServerCommandLine
implements ConfigNodeMBean {
/* After the ConfigNode leader election, a leader switch may occur,
which could cause the procedure not to be created. This can happen if the
original leader has not yet executed the procedure creation, while the other
followers have already finished starting up. Therefore, having the original
leader (before the leader switch) initiate the process ensures that only one
procedure will be created. */
if (leaderNodeLocation.getConfigNodeId() == configNodeId) {
- dataPartitionTableCheckFuture =
- dataPartitionTableCheckExecutor.submit(
- () -> {
- LOGGER.info(
- "[DataPartitionIntegrity] Prepare to start
dataPartitionTableIntegrityCheck after all datanodes started up");
-
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
-
- while (true) {
- List<Integer> dnList =
- configManager
- .getLoadManager()
- .filterDataNodeThroughStatus(NodeStatus.Running);
- if (dnList != null && !dnList.isEmpty()) {
- LOGGER.info("Starting
dataPartitionTableIntegrityCheck...");
- TSStatus status =
-
configManager.getProcedureManager().dataPartitionTableIntegrityCheck();
- if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- LOGGER.error(
- "Data partition table integrity check failed!
Current status code is {}, status message is {}",
- status.getCode(),
- status.getMessage());
+ if (!configManager
+ .getProcedureManager()
+
.isExistUnfinishedProcedure(DataPartitionTableIntegrityCheckProcedure.class)) {
+ dataPartitionTableCheckFuture =
+ dataPartitionTableCheckExecutor.submit(
+ () -> {
+ LOGGER.info(
+ "[DataPartitionIntegrity] Prepare to start
dataPartitionTableIntegrityCheck after all datanodes started up");
+
Thread.sleep(CONF.getPartitionTableRecoverWaitAllDnUpTimeoutInMs());
+
+ while (true) {
+ List<Integer> dnList =
+ configManager
+ .getLoadManager()
+
.filterDataNodeThroughStatus(NodeStatus.Running);
+ if (dnList != null && !dnList.isEmpty()) {
+ LOGGER.info("Starting
dataPartitionTableIntegrityCheck...");
+ TSStatus status =
+ configManager
+ .getProcedureManager()
+ .dataPartitionTableIntegrityCheck();
+ if (status.getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ LOGGER.error(
+ "Data partition table integrity check failed!
Current status code is {}, status message is {}",
+ status.getCode(),
+ status.getMessage());
+ }
+ break;
+ } else {
+ LOGGER.info("No running datanodes found,
waiting...");
+ Thread.sleep(5000);
}
- break;
- } else {
- LOGGER.info("No running datanodes found, waiting...");
- Thread.sleep(5000);
}
- }
- return null;
- });
+ return null;
+ });
+ }
}
return;
} else {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index f5cdb47876a..9fc5fcb436c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -3228,7 +3228,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
@Override
- public TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat() {
+ public TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat(
+ TGenerateDataPartitionTableReq req) {
TGenerateDataPartitionTableHeartbeatResp resp = new
TGenerateDataPartitionTableHeartbeatResp();
// Must be lower than the RPC request timeout, in milliseconds
final long timeoutMs = 50000;
@@ -3238,10 +3239,13 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
// To resolve this situation that the DataNode is registered and didn't
request
// generateDataPartitionTable interface yet.
if (currentGeneratorFuture == null || currentGenerator == null) {
- resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
- resp.setMessage("No DataPartitionTable generation task found");
- resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
- return resp;
+ generateDataPartitionTable(req);
+ if (currentGeneratorFuture == null || currentGenerator == null) {
+
resp.setErrorCode(DataPartitionTableGeneratorState.UNKNOWN.getCode());
+ resp.setMessage("No DataPartitionTable generation task found");
+
resp.setStatus(RpcUtils.getStatus(TSStatusCode.INTERNAL_SERVER_ERROR));
+ return resp;
+ }
}
currentGeneratorFuture.get(timeoutMs, TimeUnit.MILLISECONDS);
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 8c3e12217e0..4323e956a8e 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -1330,7 +1330,7 @@ service IDataNodeRPCService {
/**
* Check the status of DataPartitionTable generation task
*/
- TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat()
+ TGenerateDataPartitionTableHeartbeatResp
generateDataPartitionTableHeartbeat(TGenerateDataPartitionTableReq req)
/**
* END: Data Partition Table Integrity Check