This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 8889255c0c4 Load: Implement region operations cache for load × region
migration detection (#15210)
8889255c0c4 is described below
commit 8889255c0c48e40faf6e17346587364fa1162a4a
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Mar 28 23:18:16 2025 +0800
Load: Implement region operations cache for load × region migration
detection (#15210)
Co-authored-by: Steve Yurong Su <[email protected]>
---
.../iotdb/confignode/manager/ProcedureManager.java | 43 +++++++++-----
.../manager/load/service/HeartbeatService.java | 12 ++++
.../procedure/env/ConfigNodeProcedureEnv.java | 11 +++-
.../impl/region/RegionMigrateProcedure.java | 8 +--
.../impl/DataNodeInternalRPCServiceImpl.java | 9 +++
.../plan/scheduler/load/LoadTsFileScheduler.java | 5 +-
.../iotdb/db/service/RegionMigrateService.java | 69 ++++++++++++++++++----
.../src/main/thrift/datanode.thrift | 9 ++-
8 files changed, 133 insertions(+), 33 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index e47ed4abc69..74522357061 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -167,6 +167,7 @@ import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class ProcedureManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(ProcedureManager.class);
@@ -709,16 +710,19 @@ public class ProcedureManager {
TDataNodeLocation originalDataNode,
TDataNodeLocation destDataNode,
TDataNodeLocation coordinatorForAddPeer) {
- String failMessage =
- regionOperationCommonCheck(
- regionGroupId,
- destDataNode,
- Arrays.asList(
- new Pair<>("Original DataNode", originalDataNode),
- new Pair<>("Destination DataNode", destDataNode),
- new Pair<>("Coordinator for add peer", coordinatorForAddPeer)),
- migrateRegionReq.getModel());
- if (configManager
+ String failMessage;
+ if ((failMessage =
+ regionOperationCommonCheck(
+ regionGroupId,
+ destDataNode,
+ Arrays.asList(
+ new Pair<>("Original DataNode", originalDataNode),
+ new Pair<>("Destination DataNode", destDataNode),
+ new Pair<>("Coordinator for add peer",
coordinatorForAddPeer)),
+ migrateRegionReq.getModel()))
+ != null) {
+ // do nothing
+ } else if (configManager
.getPartitionManager()
.getAllReplicaSets(originalDataNode.getDataNodeId())
.stream()
@@ -955,10 +959,7 @@ public class ProcedureManager {
private String checkRegionOperationDuplication(TConsensusGroupId regionId) {
List<? extends RegionOperationProcedure<?>>
otherRegionMemberChangeProcedures =
- getExecutor().getProcedures().values().stream()
- .filter(procedure -> !procedure.isFinished())
- .filter(procedure -> procedure instanceof RegionOperationProcedure)
- .map(procedure -> (RegionOperationProcedure<?>) procedure)
+ getRegionOperationProcedures()
.filter(
regionMemberChangeProcedure ->
regionId.equals(regionMemberChangeProcedure.getRegionId()))
@@ -971,6 +972,20 @@ public class ProcedureManager {
return null;
}
+ public List<TConsensusGroupId> getRegionOperationConsensusIds() {
+ return getRegionOperationProcedures()
+ .map(RegionOperationProcedure::getRegionId)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ private Stream<RegionOperationProcedure<?>> getRegionOperationProcedures() {
+ return getExecutor().getProcedures().values().stream()
+ .filter(procedure -> !procedure.isFinished())
+ .filter(procedure -> procedure instanceof RegionOperationProcedure)
+ .map(procedure -> (RegionOperationProcedure<?>) procedure);
+ }
+
private String checkRegionOperationModelCorrectness(TConsensusGroupId
regionId, Model model) {
String databaseName =
configManager.getPartitionManager().getRegionDatabase(regionId);
boolean isTreeModelDatabase =
databaseName.startsWith(SqlConstant.TREE_MODEL_DATABASE_PREFIX);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index ee6bed4c9e9..9e630733025 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -41,6 +41,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import
org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.NodeManager;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
import org.apache.tsfile.utils.Pair;
@@ -141,6 +142,11 @@ public class HeartbeatService {
/* Generate heartbeat request */
TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq();
heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
+ heartbeatReq.setLogicalClock(
+ configManager
+ .getConsensusManager()
+ .getConsensusImpl()
+ .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID));
// Always sample RegionGroups' leadership as the Region heartbeat
heartbeatReq.setNeedJudgeLeader(true);
// We sample DataNode's load in every 10 heartbeat loop
@@ -169,6 +175,12 @@ public class HeartbeatService {
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
}
+ // We broadcast region operations list every 100 heartbeat loops
+ if (heartbeatCounter.get() % 100 == 0) {
+ heartbeatReq.setCurrentRegionOperations(
+
configManager.getProcedureManager().getRegionOperationConsensusIds());
+ }
+
/* Update heartbeat counter */
heartbeatCounter.getAndIncrement();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 246fbad29b0..bf6e46ae0ff 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -59,6 +59,7 @@ import
org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
@@ -486,7 +487,15 @@ public class ConfigNodeProcedureEnv {
final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
configManager.getNodeManager().getRegisteredDataNodeLocations();
final TNotifyRegionMigrationReq request =
- new TNotifyRegionMigrationReq(consensusGroupId, isStart);
+ new TNotifyRegionMigrationReq(
+ configManager
+ .getConsensusManager()
+ .getConsensusImpl()
+ .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID),
+ System.nanoTime(),
+
configManager.getProcedureManager().getRegionOperationConsensusIds());
+ request.setRegionId(consensusGroupId);
+ request.setIsStart(isStart);
final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus>
clientHandler =
new DataNodeAsyncRequestContext<>(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index 1acdd2ccd54..3d70f04f76d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -81,8 +81,8 @@ public class RegionMigrateProcedure extends
RegionOperationProcedure<RegionTrans
"[pid{}][MigrateRegion] started, {} will be migrated from
DataNode {} to {}.",
getProcId(),
regionId,
- handler.simplifiedLocation(originalDataNode),
- handler.simplifiedLocation(destDataNode));
+ RegionMaintainHandler.simplifiedLocation(originalDataNode),
+ RegionMaintainHandler.simplifiedLocation(destDataNode));
addChildProcedure(new NotifyRegionMigrationProcedure(regionId,
true));
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
@@ -120,8 +120,8 @@ public class RegionMigrateProcedure extends
RegionOperationProcedure<RegionTrans
getProcId(),
cleanHint,
regionId,
- handler.simplifiedLocation(originalDataNode),
- handler.simplifiedLocation(destDataNode),
+ RegionMaintainHandler.simplifiedLocation(originalDataNode),
+ RegionMaintainHandler.simplifiedLocation(destDataNode),
CommonDateTimeUtils.convertMillisecondToDurationStr(
System.currentTimeMillis() - getSubmittedTime()),
DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 96e6fae17c5..47fa540f51b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1949,6 +1949,15 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
}
+ if (req.isSetCurrentRegionOperations()) {
+ RegionMigrateService.getInstance()
+ .notifyRegionMigration(
+ new TNotifyRegionMigrationReq(
+ req.getLogicalClock(),
+ req.getHeartbeatTimestamp(),
+ req.getCurrentRegionOperations()));
+ }
+
return resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 86391555a8b..84ee70446ef 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -240,9 +240,10 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
- if (RegionMigrateService.getInstance().getLastNotifyTime() >
startTimeMs) {
+ if (RegionMigrateService.getInstance().getLastNotifyMigratingTime()
> startTimeMs
+ || RegionMigrateService.getInstance().mayHaveMigratingRegions())
{
LOGGER.warn(
- "LoadTsFileScheduler: Region migration started or ended during
loading TsFile {}, will convert to insertion to avoid data loss",
+ "LoadTsFileScheduler: Region migration was detected during
loading TsFile {}, will convert to insertion to avoid data loss",
filePath);
isLoadSingleTsFileSuccess = false;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 05328ea91b9..bc4b61e763a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -49,12 +49,12 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class RegionMigrateService implements IService {
@@ -75,10 +75,50 @@ public class RegionMigrateService implements IService {
private static final ConcurrentHashMap<Long, TRegionMigrateResult>
taskResultMap =
new ConcurrentHashMap<>();
- private static final AtomicLong lastNotifyTime = new
AtomicLong(Long.MIN_VALUE);
-
private static final TRegionMigrateResult unfinishedResult = new
TRegionMigrateResult();
+ private static class RegionMigrationStatusCache {
+
+ private long logicalClock = -1;
+ private long timestamp = -1;
+
+ private List<TConsensusGroupId> currentMigratingConsensusGroupIds =
Collections.emptyList();
+
+ private long lastNotifyMigratingTime = Long.MIN_VALUE;
+
+ public synchronized void update(
+ long newLogicalClock, long newTimestamp, List<TConsensusGroupId>
currentRegionOperations) {
+ if (newLogicalClock < logicalClock
+ || (newLogicalClock == logicalClock && newTimestamp <= timestamp)) {
+ return;
+ }
+ logicalClock = newLogicalClock;
+ timestamp = newTimestamp;
+
+ currentMigratingConsensusGroupIds = currentRegionOperations;
+
+ if (currentRegionOperations != null &&
!currentRegionOperations.isEmpty()) {
+ lastNotifyMigratingTime = System.currentTimeMillis();
+ }
+ }
+
+ public synchronized void notifyMigrating() {
+ lastNotifyMigratingTime = System.currentTimeMillis();
+ }
+
+ public synchronized boolean hasMigratingRegions() {
+ return currentMigratingConsensusGroupIds != null
+ && !currentMigratingConsensusGroupIds.isEmpty();
+ }
+
+ public synchronized long getLastNotifyMigratingTime() {
+ return lastNotifyMigratingTime;
+ }
+ }
+
+ private final RegionMigrationStatusCache regionMigrationStatusCache =
+ new RegionMigrationStatusCache();
+
private RegionMigrateService() {}
public static RegionMigrateService getInstance() {
@@ -86,16 +126,25 @@ public class RegionMigrateService implements IService {
}
public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
- lastNotifyTime.set(System.currentTimeMillis());
- if (req.isIsStart()) {
- LOGGER.info("Region {} is notified to begin migrating",
req.getRegionId());
- } else {
- LOGGER.info("Region {} is notified to finish migrating",
req.getRegionId());
+ regionMigrationStatusCache.update(
+ req.getLogicalClock(), req.getTimestamp(),
req.getCurrentRegionOperations());
+
+ if (req.isSetIsStart() && req.isSetRegionId()) {
+ regionMigrationStatusCache.notifyMigrating();
+ if (req.isIsStart()) {
+ LOGGER.info("Region {} is notified to begin migrating",
req.getRegionId());
+ } else {
+ LOGGER.info("Region {} is notified to finish migrating",
req.getRegionId());
+ }
}
}
- public long getLastNotifyTime() {
- return lastNotifyTime.get();
+ public long getLastNotifyMigratingTime() {
+ return regionMigrationStatusCache.getLastNotifyMigratingTime();
+ }
+
+ public boolean mayHaveMigratingRegions() {
+ return regionMigrationStatusCache.hasMigratingRegions();
}
/**
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 7008a983428..f44556a3787 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -53,8 +53,11 @@ struct TRegionMigrateResult {
}
struct TNotifyRegionMigrationReq {
- 1: required common.TConsensusGroupId regionId
- 2: required bool isStart
+ 1: required i64 logicalClock
+ 2: required i64 timestamp
+ 3: optional common.TConsensusGroupId regionId
+ 4: optional bool isStart
+ 5: required list<common.TConsensusGroupId> currentRegionOperations
}
struct TCreatePeerReq {
@@ -279,6 +282,8 @@ struct TDataNodeHeartbeatReq {
11: optional set<common.TEndPoint> configNodeEndPoints
12: optional map<i32, common.TDataNodeLocation> dataNodes
13: optional map<i32, set<i32>> topology
+ 14: required i64 logicalClock
+ 15: optional list<common.TConsensusGroupId> currentRegionOperations
}
struct TDataNodeActivation {