This is an automated email from the ASF dual-hosted git repository.

tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new af03c1692b4 [RTO/RPO] Topology awareness for query plan (#15014)
af03c1692b4 is described below

commit af03c1692b4da4016cc7a4304c7f2e18053c1562
Author: William Song <[email protected]>
AuthorDate: Sat Mar 22 17:03:16 2025 +0800

    [RTO/RPO] Topology awareness for query plan (#15014)
    
    * temp save for CI
    
    * save topology service
    
    * fix ut and it
    
    * fix ut and it
    
    * fix ut and it
    
    * fix ut and it
    
    * address review issues
    
    * address review issues
    
    * spotless
    
    * solve more conflicts and do regression test
    
    * spotless
    
    * fixing the ut and it
    
    * fixing the ut and it
    
    * address review issues
    
    * address review issues
    
    * address review issues
    
    * address review issues
---
 .../org/apache/iotdb/db/it/utils/TestUtils.java    |   8 +
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   1 +
 .../client/async/CnToDnAsyncRequestType.java       |   1 +
 .../CnToDnInternalServiceAsyncRequestManager.java  |   5 +
 .../rpc/DataNodeAsyncRequestRPCHandler.java        |   1 +
 .../iotdb/confignode/manager/ClusterManager.java   |  12 +-
 .../iotdb/confignode/manager/load/LoadManager.java |  10 +-
 .../manager/load/cache/AbstractLoadCache.java      |   2 +-
 .../manager/load/cache/IFailureDetector.java       |   5 +-
 .../confignode/manager/load/cache/LoadCache.java   |  29 +-
 .../manager/load/cache/detector/FixedDetector.java |   2 +-
 .../load/cache/detector/PhiAccrualDetector.java    |  19 +-
 .../load/cache/node/AINodeHeartbeatCache.java      |   2 +-
 .../load/cache/node/ConfigNodeHeartbeatCache.java  |   2 +-
 .../load/cache/node/DataNodeHeartbeatCache.java    |   2 +-
 .../manager/load/cache/region/RegionCache.java     |   9 +-
 .../load/cache/region/RegionGroupCache.java        |  15 +-
 .../manager/load/service/EventService.java         |  11 +-
 .../manager/load/service/HeartbeatService.java     |   8 +
 .../manager/load/service/TopologyService.java      | 297 +++++++++++++++++++++
 .../load/subscriber/IClusterStatusSubscriber.java  |   6 +-
 .../load/subscriber/NodeStatisticsChangeEvent.java |   3 +-
 .../runtime/PipeLeaderChangeHandler.java           |  12 -
 .../runtime/PipeRuntimeCoordinator.java            |  12 -
 .../manager/load/cache/RegionGroupCacheTest.java   |  19 +-
 .../manager/load/cache/detector/DetectorTest.java  |  21 +-
 .../client/dn/AsyncTSStatusRPCHandler.java         |  25 +-
 .../client/dn/DataNodeAsyncRequestRPCHandler.java  |  20 +-
 .../client/dn/DataNodeIntraHeartbeatManager.java   |  59 ++++
 .../impl/DataNodeInternalRPCServiceImpl.java       |  28 ++
 .../iotdb/db/queryengine/plan/ClusterTopology.java | 175 ++++++++++++
 .../SimpleFragmentParallelPlanner.java             |  11 +
 .../distribution/WriteFragmentParallelPlanner.java |   7 +-
 .../ReplicaSetUnreachableException.java}           |  25 +-
 .../exceptions/RootFIPlacementException.java}      |  25 +-
 .../plan/planner/plan/FragmentInstance.java        |   2 +-
 .../distribute/TableDistributedPlanGenerator.java  |  22 +-
 .../distribute/TableModelQueryFragmentPlanner.java |  12 +
 .../scheduler/FragmentInstanceDispatcherImpl.java  |   5 +
 .../apache/iotdb/db/utils/ErrorHandlingUtils.java  |   9 +-
 .../iotdb/commons/client/ClientPoolFactory.java    |   1 +
 .../client/request/AsyncRequestManager.java        |  10 +-
 .../DataNodeIntraHeartbeatRequestManager.java      |  44 +++
 .../client/request/TestConnectionUtils.java        |   3 +-
 .../iotdb/commons/concurrent/ThreadName.java       |   2 +
 .../iotdb/commons/partition/ExecutorType.java      |   8 +-
 .../iotdb/commons/partition/QueryExecutor.java     |   8 +-
 .../iotdb/commons/partition/StorageExecutor.java   |  10 +-
 .../thrift-commons/src/main/thrift/common.thrift   |   1 +
 .../src/main/thrift/datanode.thrift                |   4 +
 50 files changed, 906 insertions(+), 124 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java 
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 2a64278b1e8..fef5dc2e469 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -20,12 +20,14 @@
 package org.apache.iotdb.db.it.utils;
 
 import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.isession.SessionConfig;
 import org.apache.iotdb.isession.SessionDataSet;
 import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.StatementExecutionException;
@@ -43,6 +45,7 @@ import java.sql.Statement;
 import java.text.DateFormat;
 import java.time.ZoneId;
 import java.time.ZoneOffset;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -1706,6 +1709,11 @@ public class TestUtils {
     long retryIntervalMS = 1000;
     while (true) {
       try (Connection connection = EnvFactory.getEnv().getConnection()) {
+        final List<BaseNodeWrapper> allDataNodes =
+            new ArrayList<>(EnvFactory.getEnv().getDataNodeWrapperList());
+        EnvFactory.getEnv()
+            .ensureNodeStatus(
+                allDataNodes, Collections.nCopies(allDataNodes.size(), 
NodeStatus.Running));
         break;
       } catch (Exception e) {
         try {
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b68215cd37f..58c9de7502c 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -136,6 +136,7 @@ public enum TSStatusCode {
 
   QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
   QUERY_TIMEOUT(720),
+  PLAN_FAILED_NETWORK_PARTITION(721),
 
   // Arithmetic
   NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index 485bfe955a1..236a190812f 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -32,6 +32,7 @@ public enum CnToDnAsyncRequestType {
   SET_SYSTEM_STATUS,
   SET_CONFIGURATION,
   SUBMIT_TEST_CONNECTION_TASK,
+  SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
   TEST_CONNECTION,
 
   // Region Maintenance
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 0439e5233f2..3578168ea4b 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -380,6 +380,11 @@ public class CnToDnInternalServiceAsyncRequestManager
         (req, client, handler) ->
             client.submitTestConnectionTask(
                 (TNodeLocations) req, (SubmitTestConnectionTaskRPCHandler) 
handler));
+    actionMapBuilder.put(
+        CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+        (req, client, handler) ->
+            client.submitInternalTestConnectionTask(
+                (TNodeLocations) req, (SubmitTestConnectionTaskRPCHandler) 
handler));
     actionMapBuilder.put(
         CnToDnAsyncRequestType.TEST_CONNECTION,
         (req, client, handler) ->
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index f926589f9cc..6b0746f47e0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -174,6 +174,7 @@ public abstract class 
DataNodeAsyncRequestRPCHandler<Response>
             (Map<Integer, TRegionLeaderChangeResp>) responseMap,
             countDownLatch);
       case SUBMIT_TEST_CONNECTION_TASK:
+      case SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK:
         return new SubmitTestConnectionTaskRPCHandler(
             requestType,
             requestId,
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 0f79c50a49e..0bfc6df39e2 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -249,7 +249,8 @@ public class ClusterManager {
             location -> {
               TEndPoint endPoint = location.getInternalEndPoint();
               TServiceProvider serviceProvider =
-                  new TServiceProvider(endPoint, 
TServiceType.ConfigNodeInternalService);
+                  new TServiceProvider(
+                      endPoint, TServiceType.ConfigNodeInternalService, 
location.getConfigNodeId());
               TTestConnectionResult result =
                   new 
TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
               result.setSuccess(false).setReason(errorMessage);
@@ -261,7 +262,8 @@ public class ClusterManager {
             location -> {
               TEndPoint endPoint = location.getInternalEndPoint();
               TServiceProvider serviceProvider =
-                  new TServiceProvider(endPoint, 
TServiceType.DataNodeInternalService);
+                  new TServiceProvider(
+                      endPoint, TServiceType.DataNodeInternalService, 
location.getDataNodeId());
               TTestConnectionResult result =
                   new 
TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
               result.setSuccess(false).setReason(errorMessage);
@@ -274,7 +276,8 @@ public class ClusterManager {
               location -> {
                 TEndPoint endPoint = location.getMPPDataExchangeEndPoint();
                 TServiceProvider serviceProvider =
-                    new TServiceProvider(endPoint, 
TServiceType.DataNodeMPPService);
+                    new TServiceProvider(
+                        endPoint, TServiceType.DataNodeMPPService, 
location.getDataNodeId());
                 TTestConnectionResult result =
                     new TTestConnectionResult()
                         .setServiceProvider(serviceProvider)
@@ -288,7 +291,8 @@ public class ClusterManager {
               location -> {
                 TEndPoint endPoint = location.getClientRpcEndPoint();
                 TServiceProvider serviceProvider =
-                    new TServiceProvider(endPoint, 
TServiceType.DataNodeExternalService);
+                    new TServiceProvider(
+                        endPoint, TServiceType.DataNodeExternalService, 
location.getDataNodeId());
                 TTestConnectionResult result =
                     new TTestConnectionResult()
                         .setServiceProvider(serviceProvider)
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 921dd2cd9ae..54dd582551d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSamp
 import org.apache.iotdb.confignode.manager.load.service.EventService;
 import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
 import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
+import org.apache.iotdb.confignode.manager.load.service.TopologyService;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
 
@@ -72,6 +73,7 @@ public class LoadManager {
   protected HeartbeatService heartbeatService;
   private final StatisticsService statisticsService;
   private final EventService eventService;
+  private final TopologyService topologyService;
 
   public LoadManager(IManager configManager) {
     this.configManager = configManager;
@@ -83,7 +85,11 @@ public class LoadManager {
     this.loadCache = new LoadCache();
     setHeartbeatService(configManager, loadCache);
     this.statisticsService = new StatisticsService(loadCache);
-    this.eventService = new EventService(configManager, loadCache, 
routeBalancer);
+    this.topologyService = new TopologyService(configManager, 
loadCache::updateTopology);
+    this.eventService = new EventService(loadCache);
+    
this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
+    this.eventService.register(routeBalancer);
+    this.eventService.register(topologyService);
   }
 
   protected void setHeartbeatService(IManager configManager, LoadCache 
loadCache) {
@@ -146,6 +152,7 @@ public class LoadManager {
     statisticsService.startLoadStatisticsService();
     eventService.startEventService();
     partitionBalancer.setupPartitionBalancer();
+    topologyService.startTopologyService();
   }
 
   public void stopLoadServices() {
@@ -155,6 +162,7 @@ public class LoadManager {
     loadCache.clearHeartbeatCache();
     partitionBalancer.clearPartitionBalancer();
     routeBalancer.clearRegionPriority();
+    topologyService.stopTopologyService();
   }
 
   public void clearDataPartitionPolicyTable(String database) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index 6355dfdcc82..d61a0043520 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -57,7 +57,7 @@ public abstract class AbstractLoadCache {
                 CONF.getFailureDetectorPhiThreshold(),
                 CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
                 CONF.getHeartbeatIntervalInMs() * 200_000L,
-                60,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
                 new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
         break;
       case IFailureDetector.FIXED_DETECTOR:
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
index be6a8f621ba..4ab397f95ad 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
@@ -29,11 +29,14 @@ public interface IFailureDetector {
   String FIXED_DETECTOR = "fixed";
   String PHI_ACCRUAL_DETECTOR = "phi_accrual";
 
+  int PHI_COLD_START_THRESHOLD = 60;
+
   /**
    * Given the heartbeat history, decide whether this endpoint is still 
available
    *
+   * @param id the unique identifier of the history owner
    * @param history heartbeat history
    * @return false if the endpoint is under failure
    */
-  boolean isAvailable(List<AbstractHeartbeatSample> history);
+  boolean isAvailable(Object id, List<AbstractHeartbeatSample> history);
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index af41555093e..666c676abdf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -49,12 +49,15 @@ import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSamp
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
 
+import org.apache.thrift.annotation.Nullable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -94,6 +97,8 @@ public class LoadCache {
   private final Map<TConsensusGroupId, ConsensusGroupCache> 
consensusGroupCacheMap;
   // Map<DataNodeId, confirmedConfigNodes>
   private final Map<Integer, Set<TEndPoint>> confirmedConfigNodeMap;
+  private Map<Integer, Set<Integer>> topologyGraph;
+  private final AtomicBoolean topologyUpdated;
 
   public LoadCache() {
     this.nodeCacheMap = new ConcurrentHashMap<>();
@@ -102,6 +107,8 @@ public class LoadCache {
     this.regionSizeMap = new ConcurrentHashMap<>();
     this.consensusGroupCacheMap = new ConcurrentHashMap<>();
     this.confirmedConfigNodeMap = new ConcurrentHashMap<>();
+    this.topologyGraph = new HashMap<>();
+    this.topologyUpdated = new AtomicBoolean(false);
   }
 
   public void initHeartbeatCache(final IManager configManager) {
@@ -175,6 +182,7 @@ public class LoadCache {
                       regionGroupId,
                       new RegionGroupCache(
                           database,
+                          regionGroupId,
                           regionReplicaSet.getDataNodeLocations().stream()
                               .map(TDataNodeLocation::getDataNodeId)
                               .collect(Collectors.toSet()),
@@ -287,7 +295,8 @@ public class LoadCache {
       String database, TConsensusGroupId regionGroupId, Set<Integer> 
dataNodeIds) {
     boolean isStrongConsistency = 
CONF.isConsensusGroupStrongConsistency(regionGroupId);
     regionGroupCacheMap.put(
-        regionGroupId, new RegionGroupCache(database, dataNodeIds, 
isStrongConsistency));
+        regionGroupId,
+        new RegionGroupCache(database, regionGroupId, dataNodeIds, 
isStrongConsistency));
     consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
   }
 
@@ -299,7 +308,7 @@ public class LoadCache {
    */
   public void createRegionCache(TConsensusGroupId regionGroupId, int 
dataNodeId) {
     Optional.ofNullable(regionGroupCacheMap.get(regionGroupId))
-        .ifPresent(cache -> cache.createRegionCache(dataNodeId));
+        .ifPresent(cache -> cache.createRegionCache(dataNodeId, 
regionGroupId));
   }
 
   /**
@@ -769,6 +778,22 @@ public class LoadCache {
         regionGroupIds);
   }
 
+  public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
+    if (!latestTopology.equals(topologyGraph)) {
+      LOGGER.info("[Topology Service] Cluster topology changed, latest: {}", 
latestTopology);
+    }
+    topologyGraph = latestTopology;
+    topologyUpdated.set(true);
+  }
+
+  @Nullable
+  public Map<Integer, Set<Integer>> getTopology() {
+    if (topologyUpdated.compareAndSet(true, false)) {
+      return Collections.unmodifiableMap(topologyGraph);
+    }
+    return null;
+  }
+
   public void updateConfirmedConfigNodeEndPoints(
       int dataNodeId, Set<TEndPoint> configNodeEndPoints) {
     confirmedConfigNodeMap.put(dataNodeId, configNodeEndPoints);
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
index 35fb4e3f2a1..b9183e62151 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
@@ -41,7 +41,7 @@ public class FixedDetector implements IFailureDetector {
   }
 
   @Override
-  public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+  public boolean isAvailable(Object id, List<AbstractHeartbeatSample> history) 
{
     final AbstractHeartbeatSample lastSample =
         history.isEmpty() ? null : history.get(history.size() - 1);
     if (lastSample != null) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
index 39cc45c1519..cef58c77df0 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -24,6 +24,8 @@ import 
org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
 
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
 import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
 import org.apache.tsfile.utils.Preconditions;
 import org.slf4j.Logger;
@@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
 /**
  * The Phi Failure Detector, proposed by Hayashibara, Naohiro, et al. "The/spl 
phi/accrual failure
@@ -49,6 +52,8 @@ public class PhiAccrualDetector implements IFailureDetector {
   private final long minHeartbeatStdNs;
   private final int codeStartSampleCount;
   private final IFailureDetector fallbackDuringColdStart;
+  /* We are using cache here to avoid managing entry life cycles manually */
+  private final Cache<Object, Boolean> availibilityCache;
 
   public PhiAccrualDetector(
       long threshold,
@@ -61,17 +66,23 @@ public class PhiAccrualDetector implements IFailureDetector 
{
     this.minHeartbeatStdNs = minHeartbeatStdNs;
     this.codeStartSampleCount = minimalSampleCount;
     this.fallbackDuringColdStart = fallbackDuringColdStart;
+    this.availibilityCache =
+        CacheBuilder.newBuilder().expireAfterAccess(5, 
TimeUnit.MINUTES).build();
   }
 
   @Override
-  public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+  public boolean isAvailable(Object id, List<AbstractHeartbeatSample> history) 
{
     if (history.size() < codeStartSampleCount) {
       /* We haven't received enough heartbeat replies.*/
-      return fallbackDuringColdStart.isAvailable(history);
+      return fallbackDuringColdStart.isAvailable(id, history);
     }
     final PhiAccrual phiAccrual = create(history);
     final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
-    if (!isAvailable && LOGGER.isDebugEnabled()) {
+
+    final Boolean previousAvailability = availibilityCache.getIfPresent(id);
+    availibilityCache.put(id, isAvailable);
+
+    if (Boolean.TRUE.equals(previousAvailability) && !isAvailable) {
       // log the status change and dump the heartbeat history for analysis use
       final StringBuilder builder = new StringBuilder();
       builder.append("[");
@@ -81,7 +92,7 @@ public class PhiAccrualDetector implements IFailureDetector {
       }
       builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
       builder.append("]");
-      LOGGER.debug(String.format("Node Down, heartbeat history (ms): %s", 
builder));
+      LOGGER.info(String.format("Node Down, heartbeat history (ms): %s", 
builder));
     }
 
     return isAvailable;
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
index 187c35802af..ca15f9cb515 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
@@ -54,7 +54,7 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
 
       if (lastSample != null && 
NodeStatus.Removing.equals(lastSample.getStatus())) {
         status = NodeStatus.Removing;
-      } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+      } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
         /* Failure detector decides that this AINode is UNKNOWN */
         status = NodeStatus.Unknown;
       } else if (lastSample != null) {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index a4b8051cb74..4d675e1e8a4 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -66,7 +66,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
       if (lastSample == null) {
         /* First heartbeat not received from this ConfigNode, status is 
UNKNOWN */
         status = NodeStatus.Unknown;
-      } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+      } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
         /* Failure detector decides that this ConfigNode is UNKNOWN */
         status = NodeStatus.Unknown;
       } else {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 87dccb1465d..2e3e8906e74 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -68,7 +68,7 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
       if (lastSample == null) {
         /* First heartbeat not received from this DataNode, status is UNKNOWN 
*/
         status = NodeStatus.Unknown;
-      } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+      } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
         /* Failure detector decides that this DataNode is UNKNOWN */
         status = NodeStatus.Unknown;
       } else {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 9210585428d..a374c923f24 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.confignode.manager.load.cache.region;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
 import org.apache.iotdb.confignode.manager.load.cache.AbstractLoadCache;
 
+import org.apache.tsfile.utils.Pair;
+
 import java.util.Collections;
 import java.util.List;
 
@@ -31,9 +34,11 @@ import java.util.List;
  * statistics of the Region based on the latest RegionHeartbeatSample.
  */
 public class RegionCache extends AbstractLoadCache {
+  private final Pair<Integer, TConsensusGroupId> id;
 
-  public RegionCache() {
+  public RegionCache(int dataNodeId, TConsensusGroupId gid) {
     super();
+    this.id = new Pair<>(dataNodeId, gid);
     
this.currentStatistics.set(RegionStatistics.generateDefaultRegionStatistics());
   }
 
@@ -50,7 +55,7 @@ public class RegionCache extends AbstractLoadCache {
       if (lastSample == null) {
         /* First heartbeat not received from this region, status is UNKNOWN */
         status = RegionStatus.Unknown;
-      } else if (!failureDetector.isAvailable(history)) {
+      } else if (!failureDetector.isAvailable(id, history)) {
         /* Failure detector decides that this region is UNKNOWN */
         status = RegionStatus.Unknown;
       } else {
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index 0ee3f7d2cf8..e441054b1df 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -19,6 +19,7 @@
 
 package org.apache.iotdb.confignode.manager.load.cache.region;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
@@ -36,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference;
  * from all Regions it contains.
  */
 public class RegionGroupCache {
-
   private final String database;
   // Map<DataNodeId(where a RegionReplica resides in), RegionCache>
   private final Map<Integer, RegionCache> regionCacheMap;
@@ -45,10 +45,15 @@ public class RegionGroupCache {
   private final boolean isStrongConsistency;
 
   /** Constructor for create RegionGroupCache with default 
RegionGroupStatistics. */
-  public RegionGroupCache(String database, Set<Integer> dataNodeIds, boolean 
isStrongConsistency) {
+  public RegionGroupCache(
+      String database,
+      TConsensusGroupId groupId,
+      Set<Integer> dataNodeIds,
+      boolean isStrongConsistency) {
     this.database = database;
     this.regionCacheMap = new ConcurrentHashMap<>();
-    dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new 
RegionCache()));
+    dataNodeIds.forEach(
+        dataNodeId -> regionCacheMap.put(dataNodeId, new 
RegionCache(dataNodeId, groupId)));
     this.currentStatistics =
         new 
AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
     this.isStrongConsistency = isStrongConsistency;
@@ -78,8 +83,8 @@ public class RegionGroupCache {
    *
    * @param dataNodeId the specified DataNode
    */
-  public void createRegionCache(int dataNodeId) {
-    regionCacheMap.put(dataNodeId, new RegionCache());
+  public void createRegionCache(int dataNodeId, TConsensusGroupId groupId) {
+    regionCacheMap.put(dataNodeId, new RegionCache(dataNodeId, groupId));
   }
 
   /**
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
index 5f6035f0bca..52099bdfac3 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
@@ -25,13 +25,12 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
 import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
 import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
 import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
 
@@ -75,7 +74,7 @@ public class EventService {
       previousConsensusGroupStatisticsMap;
   private final EventBus eventPublisher;
 
-  public EventService(IManager configManager, LoadCache loadCache, 
RouteBalancer routeBalancer) {
+  public EventService(LoadCache loadCache) {
     this.loadCache = loadCache;
     this.previousNodeStatisticsMap = new TreeMap<>();
     this.previousRegionGroupStatisticsMap = new TreeMap<>();
@@ -86,8 +85,10 @@ public class EventService {
             ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName(),
             IoTDBThreadPoolFactory.newFixedThreadPool(
                 5, ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName()));
-    
eventPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
-    eventPublisher.register(routeBalancer);
+  }
+
+  public void register(final IClusterStatusSubscriber listener) {
+    this.eventPublisher.register(listener);
   }
 
   /** Start the event service. */
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index c713b0eb0a3..ee6bed4c9e9 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.Future;
@@ -161,6 +162,13 @@ public class HeartbeatService {
       
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
     }
 
+    final Map<Integer, Set<Integer>> topologyMap =
+        configManager.getLoadManager().getLoadCache().getTopology();
+    if (topologyMap != null) {
+      heartbeatReq.setTopology(topologyMap);
+      
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
+    }
+
     /* Update heartbeat counter */
     heartbeatCounter.getAndIncrement();
 
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
new file mode 100644
index 00000000000..7aa3ac91c59
--- /dev/null
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import 
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import 
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import 
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TopologyService.class);
+  private static final long PROBING_INTERVAL_MS = 5_000L;
+  private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+  private static final int SAMPLING_WINDOW_SIZE = 100;
+
+  private final ExecutorService topologyThread =
+      IoTDBThreadPoolFactory.newSingleThreadExecutor(
+          ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+  private Future<?> future = null;
+  private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+  private final AwaitForSignal awaitForSignal;
+  private final IManager configManager;
+
+  private final AtomicBoolean shouldRun;
+
+  /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+  private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>> 
heartbeats;
+  private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+  private final IFailureDetector failureDetector;
+  private static final ConfigNodeConfig CONF = 
ConfigNodeDescriptor.getInstance().getConf();
+
+  public TopologyService(
+      IManager configManager, Consumer<Map<Integer, Set<Integer>>> 
topologyChangeListener) {
+    this.configManager = configManager;
+    this.topologyChangeListener = topologyChangeListener;
+    this.heartbeats = new ConcurrentHashMap<>();
+    this.shouldRun = new AtomicBoolean(false);
+    this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+    // here we use the same failure
+    switch (CONF.getFailureDetector()) {
+      case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+        this.failureDetector =
+            new PhiAccrualDetector(
+                CONF.getFailureDetectorPhiThreshold(),
+                CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+                CONF.getHeartbeatIntervalInMs() * 200_000L,
+                IFailureDetector.PHI_COLD_START_THRESHOLD,
+                new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() 
* 1000_000L));
+        break;
+      case IFailureDetector.FIXED_DETECTOR:
+      default:
+        this.failureDetector =
+            new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() * 
1000_000L);
+    }
+  }
+
+  public synchronized void startTopologyService() {
+    if (future == null) {
+      future = this.topologyThread.submit(this);
+    }
+    shouldRun.set(true);
+    LOGGER.info("Topology Probing has started successfully");
+  }
+
+  public synchronized void stopTopologyService() {
+    shouldRun.set(false);
+    future.cancel(true);
+    future = null;
+    heartbeats.clear();
+    LOGGER.info("Topology Probing has stopped successfully");
+  }
+
+  /**
+   * Schedule the {@link #topologyProbing} task either: 1. every 
PROBING_INTERVAL_MS interval. 2.
+   * Manually triggered by outside events (node restart / register, etc.).
+   */
+  private boolean mayWait() {
+    try {
+      this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+      return true;
+    } catch (InterruptedException e) {
+      return false;
+    }
+  }
+
+  @Override
+  public void run() {
+    while (shouldRun.get() && mayWait()) {
+      topologyProbing();
+    }
+  }
+
+  private synchronized void topologyProbing() {
+    // 1. get the latest datanode list
+    final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+    final Set<Integer> dataNodeIds = new HashSet<>();
+    for (final TDataNodeConfiguration dataNodeConf :
+        configManager.getNodeManager().getRegisteredDataNodes()) {
+      final TDataNodeLocation location = dataNodeConf.getLocation();
+      if (startingDataNodes.contains(location.getDataNodeId())) {
+        continue; // we shall wait for internal endpoint to be ready
+      }
+      dataNodeLocations.add(location);
+      dataNodeIds.add(location.getDataNodeId());
+    }
+
+    // 2. send the verify connection RPC to all datanodes
+    final TNodeLocations nodeLocations = new TNodeLocations();
+    nodeLocations.setDataNodeLocations(dataNodeLocations);
+    nodeLocations.setConfigNodeLocations(Collections.emptyList());
+    final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+        configManager.getNodeManager().getRegisteredDataNodes().stream()
+            .map(TDataNodeConfiguration::getLocation)
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, 
location -> location));
+    final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+        dataNodeAsyncRequestContext =
+            new DataNodeAsyncRequestContext<>(
+                CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+                nodeLocations,
+                dataNodeLocationMap);
+    CnToDnInternalServiceAsyncRequestManager.getInstance()
+        .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext, 
PROBING_TIMEOUT_MS);
+    final List<TTestConnectionResult> results = new ArrayList<>();
+    dataNodeAsyncRequestContext
+        .getResponseMap()
+        .forEach(
+            (nodeId, resp) -> {
+              if (resp.isSetResultList()) {
+                results.addAll(resp.getResultList());
+              }
+            });
+
+    // 3. collect results and update the heartbeat timestamps
+    for (final TTestConnectionResult result : results) {
+      final int fromDataNodeId =
+          Optional.ofNullable(result.getSender().getDataNodeLocation())
+              .map(TDataNodeLocation::getDataNodeId)
+              .orElse(-1);
+      final int toDataNodeId = result.getServiceProvider().getNodeId();
+      if (result.isSuccess()
+          && dataNodeIds.contains(fromDataNodeId)
+          && dataNodeIds.contains(toDataNodeId)) {
+        // testAllDataNodeConnectionWithTimeout ensures the heartbeats are 
Dn-Dn internally. Here we
+        // just double-check.
+        final List<AbstractHeartbeatSample> heartbeatHistory =
+            heartbeats.computeIfAbsent(
+                new Pair<>(fromDataNodeId, toDataNodeId), p -> new 
LinkedList<>());
+        heartbeatHistory.add(new NodeHeartbeatSample(NodeStatus.Running));
+        if (heartbeatHistory.size() > SAMPLING_WINDOW_SIZE) {
+          heartbeatHistory.remove(0);
+        }
+      }
+    }
+
+    // 4. use failure detector to identify potential network partitions
+    final Map<Integer, Set<Integer>> latestTopology =
+        dataNodeLocations.stream()
+            .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k -> 
new HashSet<>()));
+    final Map<Integer, Set<Integer>> partitioned = new HashMap<>();
+    for (final Map.Entry<Pair<Integer, Integer>, 
List<AbstractHeartbeatSample>> entry :
+        heartbeats.entrySet()) {
+      final int fromId = entry.getKey().getLeft();
+      final int toId = entry.getKey().getRight();
+      if (!entry.getValue().isEmpty()
+          && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
+        LOGGER.debug("Connection from DataNode {} to DataNode {} is broken", 
fromId, toId);
+        partitioned.computeIfAbsent(fromId, id -> new HashSet<>()).add(toId);
+      } else {
+        latestTopology.get(fromId).add(toId);
+      }
+    }
+
+    if (!partitioned.isEmpty()) {
+      logAsymmetricPartition(partitioned);
+    }
+
+    // 5. notify the listeners on topology change
+    if (shouldRun.get()) {
+      topologyChangeListener.accept(latestTopology);
+    }
+  }
+
+  private void logAsymmetricPartition(final Map<Integer, Set<Integer>> 
partitioned) {
+    for (final int fromId : partitioned.keySet()) {
+      for (final int toId : partitioned.get(fromId)) {
+        if (partitioned.get(toId) == null || 
!partitioned.get(toId).contains(fromId)) {
+          LOGGER.warn("[Topology] Asymmetric network partition from {} to {}", 
fromId, toId);
+        }
+      }
+    }
+  }
+
+  /** We only listen to datanode remove / restart / register events */
+  @Override
+  public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
+    final Set<Integer> datanodeIds =
+        
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
+    final Map<Integer, Pair<NodeStatistics, NodeStatistics>> changes =
+        event.getDifferentNodeStatisticsMap();
+    for (final Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry :
+        changes.entrySet()) {
+      final Integer nodeId = entry.getKey();
+      final Pair<NodeStatistics, NodeStatistics> changeEvent = 
entry.getValue();
+      if (!datanodeIds.contains(nodeId)) {
+        continue;
+      }
+      if (changeEvent.getLeft() == null) {
+        // if a new datanode registered, DO NOT trigger probing immediately
+        startingDataNodes.add(nodeId);
+        continue;
+      } else {
+        startingDataNodes.remove(nodeId);
+      }
+
+      if (changeEvent.getRight() == null) {
+        // datanode removed from cluster, clean up probing history
+        final Set<Pair<Integer, Integer>> toRemove =
+            heartbeats.keySet().stream()
+                .filter(
+                    pair ->
+                        Objects.equals(pair.getLeft(), nodeId)
+                            || Objects.equals(pair.getRight(), nodeId))
+                .collect(Collectors.toSet());
+        toRemove.forEach(heartbeats::remove);
+      } else {
+        // we only trigger probing immediately if node comes around from 
UNKNOWN to RUNNING
+        if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus())
+            && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) {
+          awaitForSignal.signal();
+        }
+      }
+    }
+  }
+}
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
index 6c1cf3c3fad..83f1dc1bc83 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
@@ -26,13 +26,13 @@ public interface IClusterStatusSubscriber {
 
   @Subscribe
   @AllowConcurrentEvents
-  void onNodeStatisticsChanged(NodeStatisticsChangeEvent event);
+  default void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {}
 
   @Subscribe
   @AllowConcurrentEvents
-  void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent event);
+  default void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent 
event) {}
 
   @Subscribe
   @AllowConcurrentEvents
-  void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent 
event);
+  default void 
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {}
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
index 11da0dc2998..caf3db4806d 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
@@ -23,6 +23,7 @@ import 
org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
 
 import org.apache.tsfile.utils.Pair;
 
+import java.util.Collections;
 import java.util.Map;
 
 /** NodeStatisticsChangeEvent represents the change of Node statistics. */
@@ -37,6 +38,6 @@ public class NodeStatisticsChangeEvent {
   }
 
   public Map<Integer, Pair<NodeStatistics, NodeStatistics>> 
getDifferentNodeStatisticsMap() {
-    return differentNodeStatisticsMap;
+    return Collections.unmodifiableMap(differentNodeStatisticsMap);
   }
 }
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index 9121ba3caa4..01ede42afcd 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
-import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
-import 
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
 
 import org.apache.tsfile.utils.Pair;
 
@@ -59,16 +57,6 @@ public class PipeLeaderChangeHandler implements 
IClusterStatusSubscriber {
     onConsensusGroupStatisticsChanged(new 
ConsensusGroupStatisticsChangeEvent(virtualChangeMap));
   }
 
-  @Override
-  public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
-    // Do nothing
-  }
-
-  @Override
-  public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent 
event) {
-    // Do nothing
-  }
-
   @Override
   public void 
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
     // If no pipe tasks, return
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 5b6369a2c3c..1e8f32eda76 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
 import 
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
-import 
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
-import 
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;
 
@@ -70,16 +68,6 @@ public class PipeRuntimeCoordinator implements 
IClusterStatusSubscriber {
     pipeLeaderChangeHandler.onConfigRegionGroupLeaderChanged();
   }
 
-  @Override
-  public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
-    // Do nothing
-  }
-
-  @Override
-  public void onRegionGroupStatisticsChanged(final 
RegionGroupStatisticsChangeEvent event) {
-    // Do nothing
-  }
-
   @Override
   public synchronized void onConsensusGroupStatisticsChanged(
       final ConsensusGroupStatisticsChangeEvent event) {
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
index 69f073aad8b..306146f6b90 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iotdb.confignode.manager.load.cache;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.commons.cluster.RegionStatus;
 import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
 import 
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
@@ -32,12 +34,15 @@ import java.util.stream.Stream;
 public class RegionGroupCacheTest {
 
   private static final String DATABASE = "root.db";
+  private static final TConsensusGroupId GROUP_ID =
+      new TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
 
   @Test
   public void getRegionStatusTest() {
     long currentTime = System.nanoTime();
     RegionGroupCache regionGroupCache =
-        new RegionGroupCache(DATABASE, Stream.of(0, 1, 2, 3, 
4).collect(Collectors.toSet()), false);
+        new RegionGroupCache(
+            DATABASE, GROUP_ID, Stream.of(0, 1, 2, 3, 
4).collect(Collectors.toSet()), false);
     regionGroupCache.cacheHeartbeatSample(
         0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
     regionGroupCache.cacheHeartbeatSample(
@@ -66,7 +71,8 @@ public class RegionGroupCacheTest {
   public void weakConsistencyRegionGroupStatusTest() {
     long currentTime = System.nanoTime();
     RegionGroupCache regionGroupCache =
-        new RegionGroupCache(DATABASE, Stream.of(0, 1, 
2).collect(Collectors.toSet()), false);
+        new RegionGroupCache(
+            DATABASE, GROUP_ID, Stream.of(0, 1, 
2).collect(Collectors.toSet()), false);
     regionGroupCache.cacheHeartbeatSample(
         0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
     regionGroupCache.cacheHeartbeatSample(
@@ -102,7 +108,8 @@ public class RegionGroupCacheTest {
   public void strongConsistencyRegionGroupStatusTest() {
     long currentTime = System.nanoTime();
     RegionGroupCache regionGroupCache =
-        new RegionGroupCache(DATABASE, Stream.of(0, 1, 
2).collect(Collectors.toSet()), true);
+        new RegionGroupCache(
+            DATABASE, GROUP_ID, Stream.of(0, 1, 
2).collect(Collectors.toSet()), true);
     regionGroupCache.cacheHeartbeatSample(
         0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
     regionGroupCache.cacheHeartbeatSample(
@@ -137,7 +144,7 @@ public class RegionGroupCacheTest {
   public void migrateRegionRegionGroupStatusTest() {
     long currentTime = System.nanoTime();
     RegionGroupCache regionGroupCache =
-        new RegionGroupCache(DATABASE, 
Stream.of(0).collect(Collectors.toSet()), true);
+        new RegionGroupCache(DATABASE, GROUP_ID, 
Stream.of(0).collect(Collectors.toSet()), true);
     regionGroupCache.cacheHeartbeatSample(
         0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
     regionGroupCache.updateCurrentStatistics();
@@ -145,7 +152,7 @@ public class RegionGroupCacheTest {
         RegionGroupStatus.Running, 
regionGroupCache.getCurrentStatistics().getRegionGroupStatus());
 
     regionGroupCache =
-        new RegionGroupCache(DATABASE, Stream.of(0, 
1).collect(Collectors.toSet()), true);
+        new RegionGroupCache(DATABASE, GROUP_ID, Stream.of(0, 
1).collect(Collectors.toSet()), true);
     regionGroupCache.cacheHeartbeatSample(
         0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
     regionGroupCache.cacheHeartbeatSample(
@@ -155,7 +162,7 @@ public class RegionGroupCacheTest {
         RegionGroupStatus.Running, 
regionGroupCache.getCurrentStatistics().getRegionGroupStatus());
 
     regionGroupCache =
-        new RegionGroupCache(DATABASE, Stream.of(0, 
1).collect(Collectors.toSet()), true);
+        new RegionGroupCache(DATABASE, GROUP_ID, Stream.of(0, 
1).collect(Collectors.toSet()), true);
     regionGroupCache.cacheHeartbeatSample(
         0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
     regionGroupCache.cacheHeartbeatSample(
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
index a56a7166646..60af7142a6c 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
@@ -33,6 +33,7 @@ import java.util.List;
 public class DetectorTest {
 
   final long sec = 1_000_000_000L;
+  final long id = 114514L; // ;)
   final FixedDetector fixedDetector = new FixedDetector(20 * sec);
   final PhiAccrualDetector phiAccrualDetector =
       new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 0, 
fixedDetector);
@@ -53,13 +54,13 @@ public class DetectorTest {
     final long lastHeartbeatTs = System.nanoTime() - 21 * sec;
     final List<AbstractHeartbeatSample> history =
         Collections.singletonList(new NodeHeartbeatSample(lastHeartbeatTs, 
NodeStatus.Running));
-    Assert.assertFalse(fixedDetector.isAvailable(history));
+    Assert.assertFalse(fixedDetector.isAvailable(id, history));
 
     final long lastAvailableHeartbeat = System.nanoTime() - 18 * sec;
     final List<AbstractHeartbeatSample> history2 =
         Collections.singletonList(
             new NodeHeartbeatSample(lastAvailableHeartbeat, 
NodeStatus.Running));
-    Assert.assertTrue(fixedDetector.isAvailable(history2));
+    Assert.assertTrue(fixedDetector.isAvailable(id, history2));
   }
 
   @Test
@@ -107,8 +108,8 @@ public class DetectorTest {
   public void testComparisonQuickFailureDetection() {
     long[] interval = new long[] {sec, sec, sec};
     List<AbstractHeartbeatSample> history = fromInterval(interval, 13 * sec);
-    Assert.assertTrue(fixedDetector.isAvailable(history));
-    Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+    Assert.assertTrue(fixedDetector.isAvailable(id, history));
+    Assert.assertFalse(phiAccrualDetector.isAvailable(id, history));
   }
 
   /**
@@ -121,8 +122,8 @@ public class DetectorTest {
     long[] interval = new long[] {sec, sec, sec};
     long gcPause = 15 * sec;
     List<AbstractHeartbeatSample> history = fromInterval(interval, gcPause + 2 
* sec);
-    Assert.assertTrue(fixedDetector.isAvailable(history));
-    Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+    Assert.assertTrue(fixedDetector.isAvailable(id, history));
+    Assert.assertFalse(phiAccrualDetector.isAvailable(id, history));
   }
 
   /**
@@ -148,8 +149,8 @@ public class DetectorTest {
           sec
         };
     List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
-    Assert.assertFalse(fixedDetector.isAvailable(history));
-    Assert.assertTrue(phiAccrualDetector.isAvailable(history));
+    Assert.assertFalse(fixedDetector.isAvailable(id, history));
+    Assert.assertTrue(phiAccrualDetector.isAvailable(id, history));
   }
 
   /**
@@ -161,8 +162,8 @@ public class DetectorTest {
         new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 60, 
fixedDetector);
     long[] interval = new long[] {sec, sec, sec};
     List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
-    Assert.assertFalse(fixedDetector.isAvailable(history));
-    Assert.assertFalse(coldStartPhi.isAvailable(history));
+    Assert.assertFalse(fixedDetector.isAvailable(id, history));
+    Assert.assertFalse(coldStartPhi.isAvailable(id, history));
   }
 
   private List<AbstractHeartbeatSample> fromInterval(long[] interval, long 
timeElapsed) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
index 9672d431c38..04dfb20879d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
 /** General RPC handler for TSStatus response type. */
 public class AsyncTSStatusRPCHandler extends 
DataNodeAsyncRequestRPCHandler<TSStatus> {
 
+  private final boolean keepSilent;
   private static final Logger LOGGER = 
LoggerFactory.getLogger(AsyncTSStatusRPCHandler.class);
 
   public AsyncTSStatusRPCHandler(
@@ -41,8 +42,10 @@ public class AsyncTSStatusRPCHandler extends 
DataNodeAsyncRequestRPCHandler<TSSt
       TDataNodeLocation targetDataNode,
       Map<Integer, TDataNodeLocation> dataNodeLocationMap,
       Map<Integer, TSStatus> responseMap,
-      CountDownLatch countDownLatch) {
+      CountDownLatch countDownLatch,
+      boolean keepSilent) {
     super(requestType, requestId, targetDataNode, dataNodeLocationMap, 
responseMap, countDownLatch);
+    this.keepSilent = keepSilent;
   }
 
   @Override
@@ -53,13 +56,17 @@ public class AsyncTSStatusRPCHandler extends 
DataNodeAsyncRequestRPCHandler<TSSt
     if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
       // Remove only if success
       nodeLocationMap.remove(requestId);
-      LOGGER.info("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);
+      if (!keepSilent) {
+        LOGGER.info("Successfully {} on DataNode: {}", requestType, 
formattedTargetLocation);
+      }
     } else {
-      LOGGER.error(
-          "Failed to {} on DataNode: {}, response: {}",
-          requestType,
-          formattedTargetLocation,
-          response);
+      if (!keepSilent) {
+        LOGGER.error(
+            "Failed to {} on DataNode: {}, response: {}",
+            requestType,
+            formattedTargetLocation,
+            response);
+      }
     }
 
     // Always CountDown
@@ -75,7 +82,9 @@ public class AsyncTSStatusRPCHandler extends 
DataNodeAsyncRequestRPCHandler<TSSt
             + formattedTargetLocation
             + ", exception: "
             + e.getMessage();
-    LOGGER.error(errorMsg);
+    if (!keepSilent) {
+      LOGGER.error(errorMsg);
+    }
 
     responseMap.put(
         requestId,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
index a15e4a0b10f..5976deb6d50 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
@@ -68,18 +68,24 @@ public abstract class 
DataNodeAsyncRequestRPCHandler<Response>
     final Map<Integer, TDataNodeLocation> nodeLocationMap = 
context.getNodeLocationMap();
     final Map<Integer, ?> responseMap = context.getResponseMap();
     final CountDownLatch countDownLatch = context.getCountDownLatch();
+    final boolean keepSilent;
     switch (requestType) {
       case TEST_CONNECTION:
+        keepSilent = true;
+        break;
       case UPDATE_ATTRIBUTE:
-        return new AsyncTSStatusRPCHandler(
-            requestType,
-            requestId,
-            targetDataNode,
-            nodeLocationMap,
-            (Map<Integer, TSStatus>) responseMap,
-            countDownLatch);
+        keepSilent = false;
+        break;
       default:
         throw new UnsupportedOperationException("request type is not 
supported: " + requestType);
     }
+    return new AsyncTSStatusRPCHandler(
+        requestType,
+        requestId,
+        targetDataNode,
+        nodeLocationMap,
+        (Map<Integer, TSStatus>) responseMap,
+        countDownLatch,
+        keepSilent);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
new file mode 100644
index 00000000000..d0ba1ba389f
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.protocol.client.dn;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.request.AsyncRequestContext;
+import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
+import 
org.apache.iotdb.commons.client.request.DataNodeIntraHeartbeatRequestManager;
+
+public class DataNodeIntraHeartbeatManager
+    extends DataNodeIntraHeartbeatRequestManager<DnToDnRequestType> {
+
+  @Override
+  protected void initActionMapBuilder() {
+    actionMapBuilder.put(
+        DnToDnRequestType.TEST_CONNECTION,
+        (req, client, handler) -> 
client.testConnectionEmptyRPC((AsyncTSStatusRPCHandler) handler));
+  }
+
+  @Override
+  protected AsyncRequestRPCHandler<?, DnToDnRequestType, TDataNodeLocation> 
buildHandler(
+      AsyncRequestContext<?, ?, DnToDnRequestType, TDataNodeLocation> 
requestContext,
+      int requestId,
+      TDataNodeLocation targetNode) {
+    return DataNodeAsyncRequestRPCHandler.createAsyncRPCHandler(
+        requestContext, requestId, targetNode);
+  }
+
+  private static class ClientPoolHolder {
+
+    private static final DataNodeIntraHeartbeatManager INSTANCE =
+        new DataNodeIntraHeartbeatManager();
+
+    private ClientPoolHolder() {
+      // Empty constructor
+    }
+  }
+
+  public static DataNodeIntraHeartbeatManager getInstance() {
+    return DataNodeIntraHeartbeatManager.ClientPoolHolder.INSTANCE;
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b2d380c35ca..47c84e5857b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import 
org.apache.iotdb.db.protocol.client.cn.DnToCnInternalServiceAsyncRequestManager;
 import org.apache.iotdb.db.protocol.client.cn.DnToCnRequestType;
 import 
org.apache.iotdb.db.protocol.client.dn.DataNodeExternalServiceAsyncRequestManager;
+import org.apache.iotdb.db.protocol.client.dn.DataNodeIntraHeartbeatManager;
 import 
org.apache.iotdb.db.protocol.client.dn.DataNodeMPPServiceAsyncRequestManager;
 import 
org.apache.iotdb.db.protocol.client.dn.DnToDnInternalServiceAsyncRequestManager;
 import org.apache.iotdb.db.protocol.client.dn.DnToDnRequestType;
@@ -104,6 +105,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
 import 
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
 import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -350,6 +352,8 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
   private final DataNodeThrottleQuotaManager throttleQuotaManager =
       DataNodeThrottleQuotaManager.getInstance();
 
+  private final ClusterTopology clusterTopology = 
ClusterTopology.getInstance();
+
   private final CommonConfig commonConfig = 
CommonDescriptor.getInstance().getConfig();
 
   private static final String SYSTEM = "system";
@@ -1763,6 +1767,14 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
             .collect(Collectors.toList()));
   }
 
+  @Override
+  public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations 
nodeLocations)
+      throws TException {
+    return new TTestConnectionResp(
+        new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+        
testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations()));
+  }
+
   private static <Location, RequestType> List<TTestConnectionResult> 
testConnections(
       final List<Location> nodeLocations,
       final Function<Location, Integer> getId,
@@ -1790,6 +1802,18 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
             
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
   }
 
+  private List<TTestConnectionResult> 
testAllDataNodeConnectionInHeartbeatChannel(
+      List<TDataNodeLocation> dataNodeLocations) {
+    return testConnections(
+        dataNodeLocations,
+        TDataNodeLocation::getDataNodeId,
+        TDataNodeLocation::getInternalEndPoint,
+        TServiceType.DataNodeInternalService,
+        DnToDnRequestType.TEST_CONNECTION,
+        (AsyncRequestContext<Object, TSStatus, DnToDnRequestType, 
TDataNodeLocation> handler) ->
+            
DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null, 
true));
+  }
+
   private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
       List<TDataNodeLocation> dataNodeLocations) {
     return testConnections(
@@ -1921,6 +1945,10 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       }
     }
 
+    if (req.isSetTopology() && req.isSetDataNodes()) {
+      clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
+    }
+
     return resp;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
new file mode 100644
index 00000000000..2357a5f50f1
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterTopology.class);
+  private final Integer myself;
+  private final AtomicReference<Map<Integer, TDataNodeLocation>> dataNodes;
+  private final AtomicReference<Map<Integer, Set<Integer>>> topologyMap;
+  private final AtomicBoolean isPartitioned = new AtomicBoolean();
+
+  public static ClusterTopology getInstance() {
+    return ClusterTopologyHolder.INSTANCE;
+  }
+
+  public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) {
+    if (!isPartitioned.get() || origin == null) {
+      return origin;
+    }
+    final Set<Integer> reachableToMyself =
+        Collections.unmodifiableSet(topologyMap.get().get(myself));
+    final List<TDataNodeLocation> locations = new ArrayList<>();
+    for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+      if (reachableToMyself.contains(location.getDataNodeId())) {
+        locations.add(location);
+      }
+    }
+    return new TRegionReplicaSet(origin.getRegionId(), locations);
+  }
+
+  public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+      Set<Map.Entry<TRegionReplicaSet, T>> input) {
+    if (!isPartitioned.get()) {
+      return input;
+    }
+    final List<TRegionReplicaSet> allSets =
+        input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+    final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);
+    final Map<TConsensusGroupId, TRegionReplicaSet> newMap = new HashMap<>();
+    candidates.forEach(set -> newMap.put(set.getRegionId(), set));
+    final Map<TRegionReplicaSet, T> candidateMap = new HashMap<>();
+    for (final Map.Entry<TRegionReplicaSet, T> entry : input) {
+      final TConsensusGroupId gid = entry.getKey().getRegionId();
+      final TRegionReplicaSet replicaSet = newMap.get(gid);
+      if (replicaSet != null) {
+        candidateMap.put(replicaSet, entry.getValue());
+      }
+    }
+    return candidateMap.entrySet();
+  }
+
+  private List<TRegionReplicaSet> 
getReachableCandidates(List<TRegionReplicaSet> all) {
+    if (!isPartitioned.get() || all == null || all.isEmpty()) {
+      return all;
+    }
+    final Map<Integer, Set<Integer>> topologyMapCurrent =
+        Collections.unmodifiableMap(this.topologyMap.get());
+
+    // brute-force search to select DataNode candidates that can communicate 
to all
+    // TRegionReplicaSets
+    final List<Integer> dataNodeCandidates = new ArrayList<>();
+    for (final Integer datanode : topologyMapCurrent.keySet()) {
+      boolean reachableToAllSets = true;
+      final Set<Integer> datanodeReachableToThis = 
topologyMapCurrent.get(datanode);
+      for (final TRegionReplicaSet replicaSet : all) {
+        final List<Integer> replicaNodeLocations =
+            replicaSet.getDataNodeLocations().stream()
+                .map(TDataNodeLocation::getDataNodeId)
+                .collect(Collectors.toList());
+        replicaNodeLocations.retainAll(datanodeReachableToThis);
+        reachableToAllSets = !replicaNodeLocations.isEmpty();
+      }
+      if (reachableToAllSets) {
+        dataNodeCandidates.add(datanode);
+      }
+    }
+
+    // select TRegionReplicaSet candidates whose DataNode Locations contain at 
least one
+    // allReachableDataNodes
+    final List<TRegionReplicaSet> reachableSetCandidates = new ArrayList<>();
+    for (final TRegionReplicaSet replicaSet : all) {
+      final List<Integer> commonLocations =
+          replicaSet.getDataNodeLocations().stream()
+              .map(TDataNodeLocation::getDataNodeId)
+              .collect(Collectors.toList());
+      commonLocations.retainAll(dataNodeCandidates);
+      if (!commonLocations.isEmpty()) {
+        final List<TDataNodeLocation> validLocations =
+            
commonLocations.stream().map(dataNodes.get()::get).collect(Collectors.toList());
+        final TRegionReplicaSet validCandidate =
+            new TRegionReplicaSet(replicaSet.getRegionId(), validLocations);
+        reachableSetCandidates.add(validCandidate);
+      }
+    }
+
+    return reachableSetCandidates;
+  }
+
+  public void updateTopology(
+      final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer, 
Set<Integer>> latestTopology) {
+    if (!latestTopology.equals(topologyMap.get())) {
+      LOGGER.info("[Topology] latest view from config-node: {}", 
latestTopology);
+    }
+    this.dataNodes.set(dataNodes);
+    this.topologyMap.set(latestTopology);
+    if (latestTopology.get(myself) == null || 
latestTopology.get(myself).isEmpty()) {
+      // latest topology doesn't include this node information.
+      // This mostly happens when this node just starts and haven't report 
connection details.
+      this.isPartitioned.set(false);
+    } else {
+      this.isPartitioned.set(latestTopology.get(myself).size() != 
latestTopology.keySet().size());
+    }
+    if (isPartitioned.get() && LOGGER.isDebugEnabled()) {
+      final Set<Integer> allDataLocations = new 
HashSet<>(latestTopology.keySet());
+      allDataLocations.removeAll(latestTopology.get(myself));
+      final String partitioned =
+          allDataLocations.stream()
+              .collect(
+                  StringBuilder::new, (sb, id) -> sb.append(",").append(id), 
StringBuilder::append)
+              .toString();
+      LOGGER.debug("This DataNode {} is partitioned with [{}]", myself, 
partitioned);
+    }
+  }
+
+  private ClusterTopology() {
+    this.myself =
+        
IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId();
+    this.isPartitioned.set(false);
+    this.topologyMap = new AtomicReference<>(Collections.emptyMap());
+    this.dataNodes = new AtomicReference<>(Collections.emptyMap());
+  }
+
+  private static class ClusterTopologyHolder {
+
+    private static final ClusterTopology INSTANCE = new ClusterTopology();
+
+    private ClusterTopologyHolder() {}
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 3ed0ffa7fe4..ee11364947b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -27,9 +27,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.expression.Expression;
 import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
+import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -44,6 +46,7 @@ import 
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesSta
 import 
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
@@ -75,6 +78,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
 
   // Record FragmentInstances dispatched to same DataNode
   private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap;
+  private final ClusterTopology topology = ClusterTopology.getInstance();
 
   public SimpleFragmentParallelPlanner(
       SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
@@ -152,6 +156,13 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
     TRegionReplicaSet regionReplicaSet = 
fragment.getTargetRegionForTreeModel();
+    if (regionReplicaSet != null
+        && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
+      regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet);
+      if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+        throw new 
ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel());
+      }
+    }
 
     // Set ExecutorType and target host for the instance
     // We need to store all the replica host in case of the scenario that the 
instance need to be
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 52d85471590..1e5c0567825 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,8 +19,10 @@
 
 package org.apache.iotdb.db.queryengine.plan.planner.distribution;
 
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.commons.partition.StorageExecutor;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
 import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
 import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -39,6 +41,7 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
   private IAnalysis analysis;
   private MPPQueryContext queryContext;
   private BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>> 
nodeSplitter;
+  private final ClusterTopology topology = ClusterTopology.getInstance();
 
   public WriteFragmentParallelPlanner(
       SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext) {
@@ -80,7 +83,9 @@ public class WriteFragmentParallelPlanner implements 
IFragmentParallelPlaner {
               Long.MAX_VALUE,
               queryContext.getSession());
       if (split.getRegionReplicaSet() != null) {
-        instance.setExecutorAndHost(new 
StorageExecutor(split.getRegionReplicaSet()));
+        final TRegionReplicaSet validSet =
+            topology.getValidatedReplicaSet(split.getRegionReplicaSet());
+        instance.setExecutorAndHost(new StorageExecutor(validSet));
       }
       ret.add(instance);
     }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
similarity index 55%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
index 648762f7c4b..1e5a0319256 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
@@ -17,20 +17,21 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.partition;
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-/** The interface is used to indicate where to execute a FragmentInstance */
-public interface ExecutorType {
-
-  /** Indicate if ExecutorType is StorageExecutor */
-  boolean isStorageExecutor();
-
-  TDataNodeLocation getDataNodeLocation();
-
-  default TRegionReplicaSet getRegionReplicaSet() {
-    throw new UnsupportedOperationException(getClass().getName());
+/**
+ * When ALL DataNodeLocations in a QUERY-typed {@link
+ * org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance} are 
unreachable, possibly due
+ * to network partition issues, this exception will be thrown and this query 
will fail.
+ */
+public class ReplicaSetUnreachableException extends IoTDBRuntimeException {
+  public ReplicaSetUnreachableException(TRegionReplicaSet replicaSet) {
+    super(
+        "All replica cannot be reached:" + replicaSet.toString(),
+        TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
   }
 }
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
similarity index 52%
copy from 
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
copy to 
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
index 648762f7c4b..15b6c640aa0 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
@@ -17,20 +17,23 @@
  * under the License.
  */
 
-package org.apache.iotdb.commons.partition;
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
 
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.rpc.TSStatusCode;
 
-/** The interface is used to indicate where to execute a FragmentInstance */
-public interface ExecutorType {
+import java.util.Collection;
 
-  /** Indicate if ExecutorType is StorageExecutor */
-  boolean isStorageExecutor();
-
-  TDataNodeLocation getDataNodeLocation();
-
-  default TRegionReplicaSet getRegionReplicaSet() {
-    throw new UnsupportedOperationException(getClass().getName());
+/**
+ * During planning phase of Query, if there exists no datanode that can be 
served as the role of
+ * RootFragmentInstance placement, that is, no datanode can reach to all 
replica-sets possibly due
+ * to network partition issues, this exception will be thrown and this query 
will fail.
+ */
+public class RootFIPlacementException extends IoTDBRuntimeException {
+  public RootFIPlacementException(Collection<TRegionReplicaSet> replicaSets) {
+    super(
+        "root FragmentInstance placement error: " + replicaSets.toString(),
+        TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 7a5afed6d8b..69d2a77fe3e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -142,7 +142,7 @@ public class FragmentInstance implements IConsensusRequest {
       return;
     }
     this.executorType = executorType;
-    this.hostDataNode = executorType.getDataNodeLocation();
+    this.hostDataNode = executorType.getDataNodeLocation().orElse(null);
   }
 
   // Although the HostDataNode is set in method setDataRegionAndHost(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 0baefe039b5..7d4c752d0ff 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -30,8 +30,10 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
 import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
+import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -125,6 +127,7 @@ public class TableDistributedPlanGenerator
   private final SymbolAllocator symbolAllocator;
   private final Map<PlanNodeId, OrderingScheme> nodeOrderingMap = new 
HashMap<>();
   private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier 
dataNodeLocationSupplier;
+  private final ClusterTopology topology = ClusterTopology.getInstance();
 
   public TableDistributedPlanGenerator(
       final MPPQueryContext queryContext,
@@ -520,7 +523,7 @@ public class TableDistributedPlanGenerator
     TRegionReplicaSet mostUsedDataRegion = null;
     int maxDeviceEntrySizeOfTableScan = 0;
     for (final Map.Entry<TRegionReplicaSet, DeviceTableScanNode> entry :
-        tableScanNodeMap.entrySet()) {
+        topology.filterReachableCandidates(tableScanNodeMap.entrySet())) {
       final DeviceTableScanNode subDeviceTableScanNode = entry.getValue();
       resultTableScanNodeList.add(subDeviceTableScanNode);
 
@@ -530,6 +533,9 @@ public class TableDistributedPlanGenerator
         maxDeviceEntrySizeOfTableScan = 
subDeviceTableScanNode.getDeviceEntries().size();
       }
     }
+    if (mostUsedDataRegion == null) {
+      throw new RootFIPlacementException(tableScanNodeMap.keySet());
+    }
     context.mostUsedRegion = mostUsedDataRegion;
 
     if (!context.hasSortProperty) {
@@ -636,7 +642,7 @@ public class TableDistributedPlanGenerator
     for (Map.Entry<
             TRegionReplicaSet,
             Pair<TreeAlignedDeviceViewScanNode, 
TreeNonAlignedDeviceViewScanNode>>
-        entry : tableScanNodeMap.entrySet()) {
+        entry : 
topology.filterReachableCandidates(tableScanNodeMap.entrySet())) {
       TRegionReplicaSet regionReplicaSet = entry.getKey();
       Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode> 
pair = entry.getValue();
       int currentDeviceEntrySize = 0;
@@ -656,6 +662,9 @@ public class TableDistributedPlanGenerator
         maxDeviceEntrySizeOfTableScan = currentDeviceEntrySize;
       }
     }
+    if (mostUsedDataRegion == null) {
+      throw new RootFIPlacementException(tableScanNodeMap.keySet());
+    }
     context.mostUsedRegion = mostUsedDataRegion;
 
     if (!context.hasSortProperty) {
@@ -813,7 +822,8 @@ public class TableDistributedPlanGenerator
     List<PlanNode> resultTableScanNodeList = new ArrayList<>();
     TRegionReplicaSet mostUsedDataRegion = null;
     int maxDeviceEntrySizeOfTableScan = 0;
-    for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry : 
regionNodeMap.entrySet()) {
+    for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry :
+        topology.filterReachableCandidates(regionNodeMap.entrySet())) {
       DeviceTableScanNode subDeviceTableScanNode = entry.getValue();
       resultTableScanNodeList.add(subDeviceTableScanNode);
 
@@ -823,6 +833,9 @@ public class TableDistributedPlanGenerator
         maxDeviceEntrySizeOfTableScan = 
subDeviceTableScanNode.getDeviceEntries().size();
       }
     }
+    if (mostUsedDataRegion == null) {
+      throw new RootFIPlacementException(regionNodeMap.keySet());
+    }
     context.mostUsedRegion = mostUsedDataRegion;
 
     if (context.hasSortProperty) {
@@ -1324,6 +1337,9 @@ public class TableDistributedPlanGenerator
           maxDeviceEntrySizeOfTableScan = 
subTableDeviceFetchNode.getDeviceIdList().size();
         }
       }
+      if (mostUsedSchemaRegion == null) {
+        throw new RootFIPlacementException(tableDeviceFetchMap.keySet());
+      }
       context.mostUsedRegion = mostUsedSchemaRegion;
       return res;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index 8141acbe3a3..2d152dc49f5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -29,8 +29,10 @@ import 
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
 import 
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
+import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -43,6 +45,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
 
+import org.apache.commons.collections4.CollectionUtils;
 import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,6 +77,7 @@ public class TableModelQueryFragmentPlanner {
 
   // Record FragmentInstances dispatched to same DataNode
   private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap = 
new HashMap<>();
+  private final ClusterTopology topology = ClusterTopology.getInstance();
 
   private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
 
@@ -124,6 +128,14 @@ public class TableModelQueryFragmentPlanner {
     // Get the target region for origin PlanFragment, then its instance will 
be distributed one
     // of them.
     TRegionReplicaSet regionReplicaSet = 
fragment.getTargetRegionForTableModel(nodeDistributionMap);
+    if (regionReplicaSet != null
+        && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
+      regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet);
+      if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+        throw new ReplicaSetUnreachableException(
+            fragment.getTargetRegionForTableModel(nodeDistributionMap));
+      }
+    }
 
     // Set ExecutorType and target host for the instance,
     // We need to store all the replica host in case of the scenario that the 
instance need to be
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db0c270d176..5aef2e3176c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -204,6 +204,11 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
     List<FragmentInstance> localInstances = new ArrayList<>();
     List<FragmentInstance> remoteInstances = new ArrayList<>();
     for (FragmentInstance instance : instances) {
+      if (instance.getHostDataNode() == null) {
+        dataNodeFailureList.add(
+            new 
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));
+        continue;
+      }
       TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
       if (isDispatchedToLocal(endPoint)) {
         localInstances.add(instance);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 38d4d86d24e..49a8086d635 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -30,6 +30,8 @@ import 
org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
 import org.apache.iotdb.db.exception.sql.SemanticException;
 import org.apache.iotdb.db.protocol.thrift.OperationType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
+import 
org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementException;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
@@ -111,7 +113,8 @@ public class ErrorHandlingUtils {
             || status.getCode() == 
TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()
             || status.getCode() == 
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()
             || status.getCode() == 
TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
-            || status.getCode() == 
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()) {
+            || status.getCode() == 
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
+            || status.getCode() == 
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()) {
           LOGGER.info(message);
         } else {
           LOGGER.warn(message, e);
@@ -151,6 +154,10 @@ public class ErrorHandlingUtils {
     } else if (t instanceof QueryInBatchStatementException) {
       return RpcUtils.getStatus(
           TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR + 
rootCause.getMessage());
+    } else if (t instanceof RootFIPlacementException) {
+      return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, 
rootCause.getMessage());
+    } else if (t instanceof ReplicaSetUnreachableException) {
+      return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION, 
rootCause.getMessage());
     } else if (t instanceof IoTDBException) {
       return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(), 
rootCause.getMessage());
     } else if (t instanceof TsFileRuntimeException) {
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 169c07d59d9..fad0efa75b8 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -131,6 +131,7 @@ public class ClientPoolFactory {
                       
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
                       
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
                       
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+                      .setPrintLogWhenEncounterException(false)
                       .build(),
                   ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
               new 
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index aa1ca3fe4d4..8053b677c78 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -118,6 +118,14 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
       final AsyncRequestContext<?, ?, RequestType, NodeLocation> 
requestContext,
       final int retryNum,
       final Long timeoutInMs) {
+    sendAsyncRequest(requestContext, retryNum, timeoutInMs, false);
+  }
+
+  public void sendAsyncRequest(
+      final AsyncRequestContext<?, ?, RequestType, NodeLocation> 
requestContext,
+      final int retryNum,
+      final Long timeoutInMs,
+      final boolean keepSilent) {
     if (requestContext.getRequestIndices().isEmpty()) {
       return;
     }
@@ -155,7 +163,7 @@ public abstract class AsyncRequestManager<RequestType, 
NodeLocation, Client> {
       }
     }
 
-    if (!requestContext.getRequestIndices().isEmpty()) {
+    if (!requestContext.getRequestIndices().isEmpty() && !keepSilent) {
       LOGGER.warn(
           "Failed to {} after {} retries, requestIndices: {}",
           requestType,
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
new file mode 100644
index 00000000000..f00de855eb8
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.request;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import 
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+
+public abstract class DataNodeIntraHeartbeatRequestManager<RequestType>
+    extends AsyncRequestManager<
+        RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+
+  @Override
+  protected void initClientManager() {
+    clientManager =
+        new IClientManager.Factory<TEndPoint, 
AsyncDataNodeInternalServiceClient>()
+            .createClientManager(
+                new 
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
+  }
+
+  @Override
+  protected TEndPoint nodeLocationToEndPoint(TDataNodeLocation 
dataNodeLocation) {
+    return dataNodeLocation.getInternalEndPoint();
+  }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
index 310c74e60a1..33dcb712a20 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
@@ -67,7 +67,8 @@ public class TestConnectionUtils {
         .forEach(
             (nodeId, status) -> {
               TEndPoint endPoint = 
getEndPoint.apply(anotherNodeLocationMap.get(nodeId));
-              TServiceProvider serviceProvider = new 
TServiceProvider(endPoint, serviceType);
+              TServiceProvider serviceProvider =
+                  new TServiceProvider(endPoint, serviceType, nodeId);
               TTestConnectionResult result = new TTestConnectionResult();
               result.setSender(sender);
               result.setServiceProvider(serviceProvider);
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3965622078c..577b9bb795b 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -82,6 +82,7 @@ public enum ThreadName {
   
CONFIG_NODE_SIMPLE_CONSENSUS_WAL_FLUSH("ConfigNode-Simple-Consensus-WAL-Flush-Thread"),
   // -------------------------- ConfigNode-Heartbeat --------------------------
   CONFIG_NODE_HEART_BEAT_SERVICE("Cluster-Heartbeat-Service"),
+  CONFIG_NODE_TOPOLOGY_SERVICE("Topology-Service"),
   
ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL("AsyncConfigNodeHeartbeatServiceClientPool"),
   
ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL("AsyncDataNodeHeartbeatServiceClientPool"),
   // -------------------------- ConfigNode-LoadBalance 
--------------------------
@@ -340,6 +341,7 @@ public enum ThreadName {
       new HashSet<>(
           Arrays.asList(
               CONFIG_NODE_HEART_BEAT_SERVICE,
+              CONFIG_NODE_TOPOLOGY_SERVICE,
               ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL,
               ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL));
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
index 648762f7c4b..1fe95764287 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
@@ -22,13 +22,19 @@ package org.apache.iotdb.commons.partition;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
+import java.util.Optional;
+
 /** The interface is used to indicate where to execute a FragmentInstance */
 public interface ExecutorType {
 
   /** Indicate if ExecutorType is StorageExecutor */
   boolean isStorageExecutor();
 
-  TDataNodeLocation getDataNodeLocation();
+  /**
+   * @return Optional.empty() iff {@link #isStorageExecutor()} and all 
candidate replica locations
+   *     are unreachable
+   */
+  Optional<TDataNodeLocation> getDataNodeLocation();
 
   default TRegionReplicaSet getRegionReplicaSet() {
     throw new UnsupportedOperationException(getClass().getName());
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
index 4255c3955d8..7c53d7631f4 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
@@ -22,7 +22,10 @@ package org.apache.iotdb.commons.partition;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
+import org.apache.tsfile.utils.Preconditions;
+
 import java.util.Objects;
+import java.util.Optional;
 
 /** QueryExecutor indicates this query can execute directly without data from 
StorageEngine */
 public class QueryExecutor implements ExecutorType {
@@ -33,8 +36,9 @@ public class QueryExecutor implements ExecutorType {
   }
 
   @Override
-  public TDataNodeLocation getDataNodeLocation() {
-    return dataNodeLocation;
+  public Optional<TDataNodeLocation> getDataNodeLocation() {
+    Preconditions.checkArgument(dataNodeLocation != null);
+    return Optional.of(dataNodeLocation);
   }
 
   @Override
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index a99b4dea07e..4c256962524 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Objects;
+import java.util.Optional;
 
 /** StorageExecutor indicates execution of this query need data from 
StorageEngine */
 public class StorageExecutor implements ExecutorType {
@@ -35,8 +37,12 @@ public class StorageExecutor implements ExecutorType {
   }
 
   @Override
-  public TDataNodeLocation getDataNodeLocation() {
-    return regionReplicaSet.getDataNodeLocations().get(0);
+  @Nullable
+  public Optional<TDataNodeLocation> getDataNodeLocation() {
+    if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+      return Optional.empty();
+    }
+    return Optional.of(regionReplicaSet.getDataNodeLocations().get(0));
   }
 
   @Override
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift 
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index 8c2b8cedb46..161c8525cca 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -229,6 +229,7 @@ enum TServiceType {
 struct TServiceProvider {
   1: required TEndPoint endPoint
   2: required TServiceType serviceType
+  3: required i32 nodeId
 }
 
 struct TSender {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift 
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 34ec8f848c4..7008a983428 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -277,6 +277,8 @@ struct TDataNodeHeartbeatReq {
   9: optional i64 deviceQuotaRemain
   10: optional TDataNodeActivation activation
   11: optional set<common.TEndPoint> configNodeEndPoints
+  12: optional map<i32, common.TDataNodeLocation> dataNodes
+  13: optional map<i32, set<i32>> topology
 }
 
 struct TDataNodeActivation {
@@ -1177,6 +1179,8 @@ service IDataNodeRPCService {
 
   common.TTestConnectionResp submitTestConnectionTask(common.TNodeLocations 
nodeLocations)
 
+  common.TTestConnectionResp 
submitInternalTestConnectionTask(common.TNodeLocations nodeLocations)
+
   /** Empty rpc, only for connection test */
   common.TSStatus testConnectionEmptyRPC()
 }

Reply via email to