This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 8889255c0c4 Load: Implement region operations cache for load × region 
migration detection (#15210)
8889255c0c4 is described below

commit 8889255c0c48e40faf6e17346587364fa1162a4a
Author: Li Yu Heng <[email protected]>
AuthorDate: Fri Mar 28 23:18:16 2025 +0800

    Load: Implement region operations cache for load × region migration 
detection (#15210)
    
    Co-authored-by: Steve Yurong Su <[email protected]>
---
 .../iotdb/confignode/manager/ProcedureManager.java | 43 +++++++++-----
 .../manager/load/service/HeartbeatService.java     | 12 ++++
 .../procedure/env/ConfigNodeProcedureEnv.java      | 11 +++-
 .../impl/region/RegionMigrateProcedure.java        |  8 +--
 .../impl/DataNodeInternalRPCServiceImpl.java       |  9 +++
 .../plan/scheduler/load/LoadTsFileScheduler.java   |  5 +-
 .../iotdb/db/service/RegionMigrateService.java     | 69 ++++++++++++++++++----
 .../src/main/thrift/datanode.thrift                |  9 ++-
 8 files changed, 133 insertions(+), 33 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index e47ed4abc69..74522357061 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -167,6 +167,7 @@ import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 public class ProcedureManager {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureManager.class);
@@ -709,16 +710,19 @@ public class ProcedureManager {
       TDataNodeLocation originalDataNode,
       TDataNodeLocation destDataNode,
       TDataNodeLocation coordinatorForAddPeer) {
-    String failMessage =
-        regionOperationCommonCheck(
-            regionGroupId,
-            destDataNode,
-            Arrays.asList(
-                new Pair<>("Original DataNode", originalDataNode),
-                new Pair<>("Destination DataNode", destDataNode),
-                new Pair<>("Coordinator for add peer", coordinatorForAddPeer)),
-            migrateRegionReq.getModel());
-    if (configManager
+    String failMessage;
+    if ((failMessage =
+            regionOperationCommonCheck(
+                regionGroupId,
+                destDataNode,
+                Arrays.asList(
+                    new Pair<>("Original DataNode", originalDataNode),
+                    new Pair<>("Destination DataNode", destDataNode),
+                    new Pair<>("Coordinator for add peer", 
coordinatorForAddPeer)),
+                migrateRegionReq.getModel()))
+        != null) {
+      // do nothing
+    } else if (configManager
         .getPartitionManager()
         .getAllReplicaSets(originalDataNode.getDataNodeId())
         .stream()
@@ -955,10 +959,7 @@ public class ProcedureManager {
 
   private String checkRegionOperationDuplication(TConsensusGroupId regionId) {
     List<? extends RegionOperationProcedure<?>> 
otherRegionMemberChangeProcedures =
-        getExecutor().getProcedures().values().stream()
-            .filter(procedure -> !procedure.isFinished())
-            .filter(procedure -> procedure instanceof RegionOperationProcedure)
-            .map(procedure -> (RegionOperationProcedure<?>) procedure)
+        getRegionOperationProcedures()
             .filter(
                 regionMemberChangeProcedure ->
                     regionId.equals(regionMemberChangeProcedure.getRegionId()))
@@ -971,6 +972,20 @@ public class ProcedureManager {
     return null;
   }
 
+  public List<TConsensusGroupId> getRegionOperationConsensusIds() {
+    return getRegionOperationProcedures()
+        .map(RegionOperationProcedure::getRegionId)
+        .distinct()
+        .collect(Collectors.toList());
+  }
+
+  private Stream<RegionOperationProcedure<?>> getRegionOperationProcedures() {
+    return getExecutor().getProcedures().values().stream()
+        .filter(procedure -> !procedure.isFinished())
+        .filter(procedure -> procedure instanceof RegionOperationProcedure)
+        .map(procedure -> (RegionOperationProcedure<?>) procedure);
+  }
+
   private String checkRegionOperationModelCorrectness(TConsensusGroupId 
regionId, Model model) {
     String databaseName = 
configManager.getPartitionManager().getRegionDatabase(regionId);
     boolean isTreeModelDatabase = 
databaseName.startsWith(SqlConstant.TREE_MODEL_DATABASE_PREFIX);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index ee6bed4c9e9..9e630733025 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -41,6 +41,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import 
org.apache.iotdb.confignode.manager.load.cache.node.ConfigNodeHeartbeatCache;
 import org.apache.iotdb.confignode.manager.node.NodeManager;
 import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeHeartbeatReq;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.mpp.rpc.thrift.TDataNodeHeartbeatReq;
 
 import org.apache.tsfile.utils.Pair;
@@ -141,6 +142,11 @@ public class HeartbeatService {
     /* Generate heartbeat request */
     TDataNodeHeartbeatReq heartbeatReq = new TDataNodeHeartbeatReq();
     heartbeatReq.setHeartbeatTimestamp(System.nanoTime());
+    heartbeatReq.setLogicalClock(
+        configManager
+            .getConsensusManager()
+            .getConsensusImpl()
+            .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID));
     // Always sample RegionGroups' leadership as the Region heartbeat
     heartbeatReq.setNeedJudgeLeader(true);
     // We sample DataNode's load in every 10 heartbeat loop
@@ -169,6 +175,12 @@ public class HeartbeatService {
       
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
     }
 
+    // We broadcast region operations list every 100 heartbeat loops
+    if (heartbeatCounter.get() % 100 == 0) {
+      heartbeatReq.setCurrentRegionOperations(
+          
configManager.getProcedureManager().getRegionOperationConsensusIds());
+    }
+
     /* Update heartbeat counter */
     heartbeatCounter.getAndIncrement();
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 246fbad29b0..bf6e46ae0ff 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -59,6 +59,7 @@ import 
org.apache.iotdb.confignode.procedure.scheduler.ProcedureScheduler;
 import org.apache.iotdb.confignode.rpc.thrift.TAddConsensusGroupReq;
 import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
 import org.apache.iotdb.consensus.exception.ConsensusException;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import org.apache.iotdb.mpp.rpc.thrift.TActiveTriggerInstanceReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreateDataRegionReq;
 import org.apache.iotdb.mpp.rpc.thrift.TCreatePipePluginInstanceReq;
@@ -486,7 +487,15 @@ public class ConfigNodeProcedureEnv {
     final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
         configManager.getNodeManager().getRegisteredDataNodeLocations();
     final TNotifyRegionMigrationReq request =
-        new TNotifyRegionMigrationReq(consensusGroupId, isStart);
+        new TNotifyRegionMigrationReq(
+            configManager
+                .getConsensusManager()
+                .getConsensusImpl()
+                .getLogicalClock(ConfigNodeInfo.CONFIG_REGION_ID),
+            System.nanoTime(),
+            
configManager.getProcedureManager().getRegionOperationConsensusIds());
+    request.setRegionId(consensusGroupId);
+    request.setIsStart(isStart);
 
     final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus> 
clientHandler =
         new DataNodeAsyncRequestContext<>(
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index 1acdd2ccd54..3d70f04f76d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -81,8 +81,8 @@ public class RegionMigrateProcedure extends 
RegionOperationProcedure<RegionTrans
               "[pid{}][MigrateRegion] started, {} will be migrated from 
DataNode {} to {}.",
               getProcId(),
               regionId,
-              handler.simplifiedLocation(originalDataNode),
-              handler.simplifiedLocation(destDataNode));
+              RegionMaintainHandler.simplifiedLocation(originalDataNode),
+              RegionMaintainHandler.simplifiedLocation(destDataNode));
           addChildProcedure(new NotifyRegionMigrationProcedure(regionId, 
true));
           setNextState(RegionTransitionState.ADD_REGION_PEER);
           break;
@@ -120,8 +120,8 @@ public class RegionMigrateProcedure extends 
RegionOperationProcedure<RegionTrans
               getProcId(),
               cleanHint,
               regionId,
-              handler.simplifiedLocation(originalDataNode),
-              handler.simplifiedLocation(destDataNode),
+              RegionMaintainHandler.simplifiedLocation(originalDataNode),
+              RegionMaintainHandler.simplifiedLocation(destDataNode),
               CommonDateTimeUtils.convertMillisecondToDurationStr(
                   System.currentTimeMillis() - getSubmittedTime()),
               DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 96e6fae17c5..47fa540f51b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -1949,6 +1949,15 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
     }
 
+    if (req.isSetCurrentRegionOperations()) {
+      RegionMigrateService.getInstance()
+          .notifyRegionMigration(
+              new TNotifyRegionMigrationReq(
+                  req.getLogicalClock(),
+                  req.getHeartbeatTimestamp(),
+                  req.getCurrentRegionOperations()));
+    }
+
     return resp;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 86391555a8b..84ee70446ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -240,9 +240,10 @@ public class LoadTsFileScheduler implements IScheduler {
             }
           }
 
-          if (RegionMigrateService.getInstance().getLastNotifyTime() > 
startTimeMs) {
+          if (RegionMigrateService.getInstance().getLastNotifyMigratingTime() 
> startTimeMs
+              || RegionMigrateService.getInstance().mayHaveMigratingRegions()) 
{
             LOGGER.warn(
-                "LoadTsFileScheduler: Region migration started or ended during 
loading TsFile {}, will convert to insertion to avoid data loss",
+                "LoadTsFileScheduler: Region migration was detected during 
loading TsFile {}, will convert to insertion to avoid data loss",
                 filePath);
             isLoadSingleTsFileSuccess = false;
           }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 05328ea91b9..bc4b61e763a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -49,12 +49,12 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Collectors;
 
 public class RegionMigrateService implements IService {
@@ -75,10 +75,50 @@ public class RegionMigrateService implements IService {
   private static final ConcurrentHashMap<Long, TRegionMigrateResult> 
taskResultMap =
       new ConcurrentHashMap<>();
 
-  private static final AtomicLong lastNotifyTime = new 
AtomicLong(Long.MIN_VALUE);
-
   private static final TRegionMigrateResult unfinishedResult = new 
TRegionMigrateResult();
 
+  private static class RegionMigrationStatusCache {
+
+    private long logicalClock = -1;
+    private long timestamp = -1;
+
+    private List<TConsensusGroupId> currentMigratingConsensusGroupIds = 
Collections.emptyList();
+
+    private long lastNotifyMigratingTime = Long.MIN_VALUE;
+
+    public synchronized void update(
+        long newLogicalClock, long newTimestamp, List<TConsensusGroupId> 
currentRegionOperations) {
+      if (newLogicalClock < logicalClock
+          || (newLogicalClock == logicalClock && newTimestamp <= timestamp)) {
+        return;
+      }
+      logicalClock = newLogicalClock;
+      timestamp = newTimestamp;
+
+      currentMigratingConsensusGroupIds = currentRegionOperations;
+
+      if (currentRegionOperations != null && 
!currentRegionOperations.isEmpty()) {
+        lastNotifyMigratingTime = System.currentTimeMillis();
+      }
+    }
+
+    public synchronized void notifyMigrating() {
+      lastNotifyMigratingTime = System.currentTimeMillis();
+    }
+
+    public synchronized boolean hasMigratingRegions() {
+      return currentMigratingConsensusGroupIds != null
+          && !currentMigratingConsensusGroupIds.isEmpty();
+    }
+
+    public synchronized long getLastNotifyMigratingTime() {
+      return lastNotifyMigratingTime;
+    }
+  }
+
+  private final RegionMigrationStatusCache regionMigrationStatusCache =
+      new RegionMigrationStatusCache();
+
   private RegionMigrateService() {}
 
   public static RegionMigrateService getInstance() {
@@ -86,16 +126,25 @@ public class RegionMigrateService implements IService {
   }
 
   public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
-    lastNotifyTime.set(System.currentTimeMillis());
-    if (req.isIsStart()) {
-      LOGGER.info("Region {} is notified to begin migrating", 
req.getRegionId());
-    } else {
-      LOGGER.info("Region {} is notified to finish migrating", 
req.getRegionId());
+    regionMigrationStatusCache.update(
+        req.getLogicalClock(), req.getTimestamp(), 
req.getCurrentRegionOperations());
+
+    if (req.isSetIsStart() && req.isSetRegionId()) {
+      regionMigrationStatusCache.notifyMigrating();
+      if (req.isIsStart()) {
+        LOGGER.info("Region {} is notified to begin migrating", 
req.getRegionId());
+      } else {
+        LOGGER.info("Region {} is notified to finish migrating", 
req.getRegionId());
+      }
     }
   }
 
-  public long getLastNotifyTime() {
-    return lastNotifyTime.get();
+  public long getLastNotifyMigratingTime() {
+    return regionMigrationStatusCache.getLastNotifyMigratingTime();
+  }
+
+  public boolean mayHaveMigratingRegions() {
+    return regionMigrationStatusCache.hasMigratingRegions();
   }
 
   /**
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 7008a983428..f44556a3787 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -53,8 +53,11 @@ struct TRegionMigrateResult {
 }
 
 struct TNotifyRegionMigrationReq {
-  1: required common.TConsensusGroupId regionId
-  2: required bool isStart
+  1: required i64 logicalClock
+  2: required i64 timestamp
+  3: optional common.TConsensusGroupId regionId
+  4: optional bool isStart
+  5: required list<common.TConsensusGroupId> currentRegionOperations
 }
 
 struct TCreatePeerReq {
@@ -279,6 +282,8 @@ struct TDataNodeHeartbeatReq {
   11: optional set<common.TEndPoint> configNodeEndPoints
   12: optional map<i32, common.TDataNodeLocation> dataNodes
   13: optional map<i32, set<i32>> topology
+  14: required i64 logicalClock
+  15: optional list<common.TConsensusGroupId> currentRegionOperations
 }
 
 struct TDataNodeActivation {

Reply via email to