This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new af03c1692b4 [RTO/RPO] Topology awareness for query plan (#15014)
af03c1692b4 is described below
commit af03c1692b4da4016cc7a4304c7f2e18053c1562
Author: William Song <[email protected]>
AuthorDate: Sat Mar 22 17:03:16 2025 +0800
[RTO/RPO] Topology awareness for query plan (#15014)
* temp save for CI
* save topology service
* fix ut and it
* fix ut and it
* fix ut and it
* fix ut and it
* address review issues
* address review issues
* spotless
* solve more conflicts and do regression test
* spotless
* fixing the ut and it
* fixing the ut and it
* address review issues
* address review issues
* address review issues
* address review issues
---
.../org/apache/iotdb/db/it/utils/TestUtils.java | 8 +
.../java/org/apache/iotdb/rpc/TSStatusCode.java | 1 +
.../client/async/CnToDnAsyncRequestType.java | 1 +
.../CnToDnInternalServiceAsyncRequestManager.java | 5 +
.../rpc/DataNodeAsyncRequestRPCHandler.java | 1 +
.../iotdb/confignode/manager/ClusterManager.java | 12 +-
.../iotdb/confignode/manager/load/LoadManager.java | 10 +-
.../manager/load/cache/AbstractLoadCache.java | 2 +-
.../manager/load/cache/IFailureDetector.java | 5 +-
.../confignode/manager/load/cache/LoadCache.java | 29 +-
.../manager/load/cache/detector/FixedDetector.java | 2 +-
.../load/cache/detector/PhiAccrualDetector.java | 19 +-
.../load/cache/node/AINodeHeartbeatCache.java | 2 +-
.../load/cache/node/ConfigNodeHeartbeatCache.java | 2 +-
.../load/cache/node/DataNodeHeartbeatCache.java | 2 +-
.../manager/load/cache/region/RegionCache.java | 9 +-
.../load/cache/region/RegionGroupCache.java | 15 +-
.../manager/load/service/EventService.java | 11 +-
.../manager/load/service/HeartbeatService.java | 8 +
.../manager/load/service/TopologyService.java | 297 +++++++++++++++++++++
.../load/subscriber/IClusterStatusSubscriber.java | 6 +-
.../load/subscriber/NodeStatisticsChangeEvent.java | 3 +-
.../runtime/PipeLeaderChangeHandler.java | 12 -
.../runtime/PipeRuntimeCoordinator.java | 12 -
.../manager/load/cache/RegionGroupCacheTest.java | 19 +-
.../manager/load/cache/detector/DetectorTest.java | 21 +-
.../client/dn/AsyncTSStatusRPCHandler.java | 25 +-
.../client/dn/DataNodeAsyncRequestRPCHandler.java | 20 +-
.../client/dn/DataNodeIntraHeartbeatManager.java | 59 ++++
.../impl/DataNodeInternalRPCServiceImpl.java | 28 ++
.../iotdb/db/queryengine/plan/ClusterTopology.java | 175 ++++++++++++
.../SimpleFragmentParallelPlanner.java | 11 +
.../distribution/WriteFragmentParallelPlanner.java | 7 +-
.../ReplicaSetUnreachableException.java} | 25 +-
.../exceptions/RootFIPlacementException.java} | 25 +-
.../plan/planner/plan/FragmentInstance.java | 2 +-
.../distribute/TableDistributedPlanGenerator.java | 22 +-
.../distribute/TableModelQueryFragmentPlanner.java | 12 +
.../scheduler/FragmentInstanceDispatcherImpl.java | 5 +
.../apache/iotdb/db/utils/ErrorHandlingUtils.java | 9 +-
.../iotdb/commons/client/ClientPoolFactory.java | 1 +
.../client/request/AsyncRequestManager.java | 10 +-
.../DataNodeIntraHeartbeatRequestManager.java | 44 +++
.../client/request/TestConnectionUtils.java | 3 +-
.../iotdb/commons/concurrent/ThreadName.java | 2 +
.../iotdb/commons/partition/ExecutorType.java | 8 +-
.../iotdb/commons/partition/QueryExecutor.java | 8 +-
.../iotdb/commons/partition/StorageExecutor.java | 10 +-
.../thrift-commons/src/main/thrift/common.thrift | 1 +
.../src/main/thrift/datanode.thrift | 4 +
50 files changed, 906 insertions(+), 124 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
index 2a64278b1e8..fef5dc2e469 100644
--- a/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
+++ b/integration-test/src/test/java/org/apache/iotdb/db/it/utils/TestUtils.java
@@ -20,12 +20,14 @@
package org.apache.iotdb.db.it.utils;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.isession.SessionConfig;
import org.apache.iotdb.isession.SessionDataSet;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.itbase.env.BaseEnv;
+import org.apache.iotdb.itbase.env.BaseNodeWrapper;
import org.apache.iotdb.rpc.IoTDBConnectionException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.StatementExecutionException;
@@ -43,6 +45,7 @@ import java.sql.Statement;
import java.text.DateFormat;
import java.time.ZoneId;
import java.time.ZoneOffset;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -1706,6 +1709,11 @@ public class TestUtils {
long retryIntervalMS = 1000;
while (true) {
try (Connection connection = EnvFactory.getEnv().getConnection()) {
+ final List<BaseNodeWrapper> allDataNodes =
+ new ArrayList<>(EnvFactory.getEnv().getDataNodeWrapperList());
+ EnvFactory.getEnv()
+ .ensureNodeStatus(
+ allDataNodes, Collections.nCopies(allDataNodes.size(),
NodeStatus.Running));
break;
} catch (Exception e) {
try {
diff --git
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index b68215cd37f..58c9de7502c 100644
---
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -136,6 +136,7 @@ public enum TSStatusCode {
QUERY_EXECUTION_MEMORY_NOT_ENOUGH(719),
QUERY_TIMEOUT(720),
+ PLAN_FAILED_NETWORK_PARTITION(721),
// Arithmetic
NUMERIC_VALUE_OUT_OF_RANGE(750),
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index 485bfe955a1..236a190812f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -32,6 +32,7 @@ public enum CnToDnAsyncRequestType {
SET_SYSTEM_STATUS,
SET_CONFIGURATION,
SUBMIT_TEST_CONNECTION_TASK,
+ SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
TEST_CONNECTION,
// Region Maintenance
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 0439e5233f2..3578168ea4b 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -380,6 +380,11 @@ public class CnToDnInternalServiceAsyncRequestManager
(req, client, handler) ->
client.submitTestConnectionTask(
(TNodeLocations) req, (SubmitTestConnectionTaskRPCHandler)
handler));
+ actionMapBuilder.put(
+ CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+ (req, client, handler) ->
+ client.submitInternalTestConnectionTask(
+ (TNodeLocations) req, (SubmitTestConnectionTaskRPCHandler)
handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.TEST_CONNECTION,
(req, client, handler) ->
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index f926589f9cc..6b0746f47e0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -174,6 +174,7 @@ public abstract class
DataNodeAsyncRequestRPCHandler<Response>
(Map<Integer, TRegionLeaderChangeResp>) responseMap,
countDownLatch);
case SUBMIT_TEST_CONNECTION_TASK:
+ case SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK:
return new SubmitTestConnectionTaskRPCHandler(
requestType,
requestId,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
index 0f79c50a49e..0bfc6df39e2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ClusterManager.java
@@ -249,7 +249,8 @@ public class ClusterManager {
location -> {
TEndPoint endPoint = location.getInternalEndPoint();
TServiceProvider serviceProvider =
- new TServiceProvider(endPoint,
TServiceType.ConfigNodeInternalService);
+ new TServiceProvider(
+ endPoint, TServiceType.ConfigNodeInternalService,
location.getConfigNodeId());
TTestConnectionResult result =
new
TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
result.setSuccess(false).setReason(errorMessage);
@@ -261,7 +262,8 @@ public class ClusterManager {
location -> {
TEndPoint endPoint = location.getInternalEndPoint();
TServiceProvider serviceProvider =
- new TServiceProvider(endPoint,
TServiceType.DataNodeInternalService);
+ new TServiceProvider(
+ endPoint, TServiceType.DataNodeInternalService,
location.getDataNodeId());
TTestConnectionResult result =
new
TTestConnectionResult().setServiceProvider(serviceProvider).setSender(sender);
result.setSuccess(false).setReason(errorMessage);
@@ -274,7 +276,8 @@ public class ClusterManager {
location -> {
TEndPoint endPoint = location.getMPPDataExchangeEndPoint();
TServiceProvider serviceProvider =
- new TServiceProvider(endPoint,
TServiceType.DataNodeMPPService);
+ new TServiceProvider(
+ endPoint, TServiceType.DataNodeMPPService,
location.getDataNodeId());
TTestConnectionResult result =
new TTestConnectionResult()
.setServiceProvider(serviceProvider)
@@ -288,7 +291,8 @@ public class ClusterManager {
location -> {
TEndPoint endPoint = location.getClientRpcEndPoint();
TServiceProvider serviceProvider =
- new TServiceProvider(endPoint,
TServiceType.DataNodeExternalService);
+ new TServiceProvider(
+ endPoint, TServiceType.DataNodeExternalService,
location.getDataNodeId());
TTestConnectionResult result =
new TTestConnectionResult()
.setServiceProvider(serviceProvider)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 921dd2cd9ae..54dd582551d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSamp
import org.apache.iotdb.confignode.manager.load.service.EventService;
import org.apache.iotdb.confignode.manager.load.service.HeartbeatService;
import org.apache.iotdb.confignode.manager.load.service.StatisticsService;
+import org.apache.iotdb.confignode.manager.load.service.TopologyService;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import org.apache.iotdb.confignode.rpc.thrift.TTimeSlotList;
@@ -72,6 +73,7 @@ public class LoadManager {
protected HeartbeatService heartbeatService;
private final StatisticsService statisticsService;
private final EventService eventService;
+ private final TopologyService topologyService;
public LoadManager(IManager configManager) {
this.configManager = configManager;
@@ -83,7 +85,11 @@ public class LoadManager {
this.loadCache = new LoadCache();
setHeartbeatService(configManager, loadCache);
this.statisticsService = new StatisticsService(loadCache);
- this.eventService = new EventService(configManager, loadCache,
routeBalancer);
+ this.topologyService = new TopologyService(configManager,
loadCache::updateTopology);
+ this.eventService = new EventService(loadCache);
+
this.eventService.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
+ this.eventService.register(routeBalancer);
+ this.eventService.register(topologyService);
}
protected void setHeartbeatService(IManager configManager, LoadCache
loadCache) {
@@ -146,6 +152,7 @@ public class LoadManager {
statisticsService.startLoadStatisticsService();
eventService.startEventService();
partitionBalancer.setupPartitionBalancer();
+ topologyService.startTopologyService();
}
public void stopLoadServices() {
@@ -155,6 +162,7 @@ public class LoadManager {
loadCache.clearHeartbeatCache();
partitionBalancer.clearPartitionBalancer();
routeBalancer.clearRegionPriority();
+ topologyService.stopTopologyService();
}
public void clearDataPartitionPolicyTable(String database) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
index 6355dfdcc82..d61a0043520 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/AbstractLoadCache.java
@@ -57,7 +57,7 @@ public abstract class AbstractLoadCache {
CONF.getFailureDetectorPhiThreshold(),
CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
CONF.getHeartbeatIntervalInMs() * 200_000L,
- 60,
+ IFailureDetector.PHI_COLD_START_THRESHOLD,
new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs()
* 1000_000L));
break;
case IFailureDetector.FIXED_DETECTOR:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
index be6a8f621ba..4ab397f95ad 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/IFailureDetector.java
@@ -29,11 +29,14 @@ public interface IFailureDetector {
String FIXED_DETECTOR = "fixed";
String PHI_ACCRUAL_DETECTOR = "phi_accrual";
+ int PHI_COLD_START_THRESHOLD = 60;
+
/**
* Given the heartbeat history, decide whether this endpoint is still
available
*
+ * @param id the unique identifier of the history owner
* @param history heartbeat history
* @return false if the endpoint is under failure
*/
- boolean isAvailable(List<AbstractHeartbeatSample> history);
+ boolean isAvailable(Object id, List<AbstractHeartbeatSample> history);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index af41555093e..666c676abdf 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -49,12 +49,15 @@ import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSamp
import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
+import org.apache.thrift.annotation.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -94,6 +97,8 @@ public class LoadCache {
private final Map<TConsensusGroupId, ConsensusGroupCache>
consensusGroupCacheMap;
// Map<DataNodeId, confirmedConfigNodes>
private final Map<Integer, Set<TEndPoint>> confirmedConfigNodeMap;
+ private Map<Integer, Set<Integer>> topologyGraph;
+ private final AtomicBoolean topologyUpdated;
public LoadCache() {
this.nodeCacheMap = new ConcurrentHashMap<>();
@@ -102,6 +107,8 @@ public class LoadCache {
this.regionSizeMap = new ConcurrentHashMap<>();
this.consensusGroupCacheMap = new ConcurrentHashMap<>();
this.confirmedConfigNodeMap = new ConcurrentHashMap<>();
+ this.topologyGraph = new HashMap<>();
+ this.topologyUpdated = new AtomicBoolean(false);
}
public void initHeartbeatCache(final IManager configManager) {
@@ -175,6 +182,7 @@ public class LoadCache {
regionGroupId,
new RegionGroupCache(
database,
+ regionGroupId,
regionReplicaSet.getDataNodeLocations().stream()
.map(TDataNodeLocation::getDataNodeId)
.collect(Collectors.toSet()),
@@ -287,7 +295,8 @@ public class LoadCache {
String database, TConsensusGroupId regionGroupId, Set<Integer>
dataNodeIds) {
boolean isStrongConsistency =
CONF.isConsensusGroupStrongConsistency(regionGroupId);
regionGroupCacheMap.put(
- regionGroupId, new RegionGroupCache(database, dataNodeIds,
isStrongConsistency));
+ regionGroupId,
+ new RegionGroupCache(database, regionGroupId, dataNodeIds,
isStrongConsistency));
consensusGroupCacheMap.put(regionGroupId, new ConsensusGroupCache());
}
@@ -299,7 +308,7 @@ public class LoadCache {
*/
public void createRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
Optional.ofNullable(regionGroupCacheMap.get(regionGroupId))
- .ifPresent(cache -> cache.createRegionCache(dataNodeId));
+ .ifPresent(cache -> cache.createRegionCache(dataNodeId,
regionGroupId));
}
/**
@@ -769,6 +778,22 @@ public class LoadCache {
regionGroupIds);
}
+ public void updateTopology(Map<Integer, Set<Integer>> latestTopology) {
+ if (!latestTopology.equals(topologyGraph)) {
+ LOGGER.info("[Topology Service] Cluster topology changed, latest: {}",
latestTopology);
+ }
+ topologyGraph = latestTopology;
+ topologyUpdated.set(true);
+ }
+
+ @Nullable
+ public Map<Integer, Set<Integer>> getTopology() {
+ if (topologyUpdated.compareAndSet(true, false)) {
+ return Collections.unmodifiableMap(topologyGraph);
+ }
+ return null;
+ }
+
public void updateConfirmedConfigNodeEndPoints(
int dataNodeId, Set<TEndPoint> configNodeEndPoints) {
confirmedConfigNodeMap.put(dataNodeId, configNodeEndPoints);
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
index 35fb4e3f2a1..b9183e62151 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/FixedDetector.java
@@ -41,7 +41,7 @@ public class FixedDetector implements IFailureDetector {
}
@Override
- public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+ public boolean isAvailable(Object id, List<AbstractHeartbeatSample> history)
{
final AbstractHeartbeatSample lastSample =
history.isEmpty() ? null : history.get(history.size() - 1);
if (lastSample != null) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
index 39cc45c1519..cef58c77df0 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/detector/PhiAccrualDetector.java
@@ -24,6 +24,8 @@ import
org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.tsfile.utils.Preconditions;
import org.slf4j.Logger;
@@ -31,6 +33,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.TimeUnit;
/**
* The Phi Failure Detector, proposed by Hayashibara, Naohiro, et al. "The/spl
phi/accrual failure
@@ -49,6 +52,8 @@ public class PhiAccrualDetector implements IFailureDetector {
private final long minHeartbeatStdNs;
private final int codeStartSampleCount;
private final IFailureDetector fallbackDuringColdStart;
+ /* We are using cache here to avoid managing entry life cycles manually */
+ private final Cache<Object, Boolean> availibilityCache;
public PhiAccrualDetector(
long threshold,
@@ -61,17 +66,23 @@ public class PhiAccrualDetector implements IFailureDetector
{
this.minHeartbeatStdNs = minHeartbeatStdNs;
this.codeStartSampleCount = minimalSampleCount;
this.fallbackDuringColdStart = fallbackDuringColdStart;
+ this.availibilityCache =
+ CacheBuilder.newBuilder().expireAfterAccess(5,
TimeUnit.MINUTES).build();
}
@Override
- public boolean isAvailable(List<AbstractHeartbeatSample> history) {
+ public boolean isAvailable(Object id, List<AbstractHeartbeatSample> history)
{
if (history.size() < codeStartSampleCount) {
/* We haven't received enough heartbeat replies.*/
- return fallbackDuringColdStart.isAvailable(history);
+ return fallbackDuringColdStart.isAvailable(id, history);
}
final PhiAccrual phiAccrual = create(history);
final boolean isAvailable = phiAccrual.phi() < (double) this.threshold;
- if (!isAvailable && LOGGER.isDebugEnabled()) {
+
+ final Boolean previousAvailability = availibilityCache.getIfPresent(id);
+ availibilityCache.put(id, isAvailable);
+
+ if (Boolean.TRUE.equals(previousAvailability) && !isAvailable) {
// log the status change and dump the heartbeat history for analysis use
final StringBuilder builder = new StringBuilder();
builder.append("[");
@@ -81,7 +92,7 @@ public class PhiAccrualDetector implements IFailureDetector {
}
builder.append(phiAccrual.timeElapsedSinceLastHeartbeat / 1000_000);
builder.append("]");
- LOGGER.debug(String.format("Node Down, heartbeat history (ms): %s",
builder));
+ LOGGER.info(String.format("Node Down, heartbeat history (ms): %s",
builder));
}
return isAvailable;
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
index 187c35802af..ca15f9cb515 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/AINodeHeartbeatCache.java
@@ -54,7 +54,7 @@ public class AINodeHeartbeatCache extends BaseNodeCache {
if (lastSample != null &&
NodeStatus.Removing.equals(lastSample.getStatus())) {
status = NodeStatus.Removing;
- } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
/* Failure detector decides that this AINode is UNKNOWN */
status = NodeStatus.Unknown;
} else if (lastSample != null) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
index a4b8051cb74..4d675e1e8a4 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/ConfigNodeHeartbeatCache.java
@@ -66,7 +66,7 @@ public class ConfigNodeHeartbeatCache extends BaseNodeCache {
if (lastSample == null) {
/* First heartbeat not received from this ConfigNode, status is
UNKNOWN */
status = NodeStatus.Unknown;
- } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
/* Failure detector decides that this ConfigNode is UNKNOWN */
status = NodeStatus.Unknown;
} else {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
index 87dccb1465d..2e3e8906e74 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/node/DataNodeHeartbeatCache.java
@@ -68,7 +68,7 @@ public class DataNodeHeartbeatCache extends BaseNodeCache {
if (lastSample == null) {
/* First heartbeat not received from this DataNode, status is UNKNOWN
*/
status = NodeStatus.Unknown;
- } else if (!failureDetector.isAvailable(heartbeatHistory)) {
+ } else if (!failureDetector.isAvailable(nodeId, heartbeatHistory)) {
/* Failure detector decides that this DataNode is UNKNOWN */
status = NodeStatus.Unknown;
} else {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index 9210585428d..a374c923f24 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -19,10 +19,13 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
import org.apache.iotdb.confignode.manager.load.cache.AbstractLoadCache;
+import org.apache.tsfile.utils.Pair;
+
import java.util.Collections;
import java.util.List;
@@ -31,9 +34,11 @@ import java.util.List;
* statistics of the Region based on the latest RegionHeartbeatSample.
*/
public class RegionCache extends AbstractLoadCache {
+ private final Pair<Integer, TConsensusGroupId> id;
- public RegionCache() {
+ public RegionCache(int dataNodeId, TConsensusGroupId gid) {
super();
+ this.id = new Pair<>(dataNodeId, gid);
this.currentStatistics.set(RegionStatistics.generateDefaultRegionStatistics());
}
@@ -50,7 +55,7 @@ public class RegionCache extends AbstractLoadCache {
if (lastSample == null) {
/* First heartbeat not received from this region, status is UNKNOWN */
status = RegionStatus.Unknown;
- } else if (!failureDetector.isAvailable(history)) {
+ } else if (!failureDetector.isAvailable(id, history)) {
/* Failure detector decides that this region is UNKNOWN */
status = RegionStatus.Unknown;
} else {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index 0ee3f7d2cf8..e441054b1df 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
@@ -36,7 +37,6 @@ import java.util.concurrent.atomic.AtomicReference;
* from all Regions it contains.
*/
public class RegionGroupCache {
-
private final String database;
// Map<DataNodeId(where a RegionReplica resides in), RegionCache>
private final Map<Integer, RegionCache> regionCacheMap;
@@ -45,10 +45,15 @@ public class RegionGroupCache {
private final boolean isStrongConsistency;
/** Constructor for create RegionGroupCache with default
RegionGroupStatistics. */
- public RegionGroupCache(String database, Set<Integer> dataNodeIds, boolean
isStrongConsistency) {
+ public RegionGroupCache(
+ String database,
+ TConsensusGroupId groupId,
+ Set<Integer> dataNodeIds,
+ boolean isStrongConsistency) {
this.database = database;
this.regionCacheMap = new ConcurrentHashMap<>();
- dataNodeIds.forEach(dataNodeId -> regionCacheMap.put(dataNodeId, new
RegionCache()));
+ dataNodeIds.forEach(
+ dataNodeId -> regionCacheMap.put(dataNodeId, new
RegionCache(dataNodeId, groupId)));
this.currentStatistics =
new
AtomicReference<>(RegionGroupStatistics.generateDefaultRegionGroupStatistics());
this.isStrongConsistency = isStrongConsistency;
@@ -78,8 +83,8 @@ public class RegionGroupCache {
*
* @param dataNodeId the specified DataNode
*/
- public void createRegionCache(int dataNodeId) {
- regionCacheMap.put(dataNodeId, new RegionCache());
+ public void createRegionCache(int dataNodeId, TConsensusGroupId groupId) {
+ regionCacheMap.put(dataNodeId, new RegionCache(dataNodeId, groupId));
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
index 5f6035f0bca..52099bdfac3 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/EventService.java
@@ -25,13 +25,12 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
-import org.apache.iotdb.confignode.manager.IManager;
-import org.apache.iotdb.confignode.manager.load.balancer.RouteBalancer;
import org.apache.iotdb.confignode.manager.load.cache.LoadCache;
import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupStatistics;
import
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
+import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
@@ -75,7 +74,7 @@ public class EventService {
previousConsensusGroupStatisticsMap;
private final EventBus eventPublisher;
- public EventService(IManager configManager, LoadCache loadCache,
RouteBalancer routeBalancer) {
+ public EventService(LoadCache loadCache) {
this.loadCache = loadCache;
this.previousNodeStatisticsMap = new TreeMap<>();
this.previousRegionGroupStatisticsMap = new TreeMap<>();
@@ -86,8 +85,10 @@ public class EventService {
ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName(),
IoTDBThreadPoolFactory.newFixedThreadPool(
5, ThreadName.CONFIG_NODE_LOAD_PUBLISHER.getName()));
-
eventPublisher.register(configManager.getPipeManager().getPipeRuntimeCoordinator());
- eventPublisher.register(routeBalancer);
+ }
+
+ public void register(final IClusterStatusSubscriber listener) {
+ this.eventPublisher.register(listener);
}
/** Start the event service. */
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
index c713b0eb0a3..ee6bed4c9e9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/HeartbeatService.java
@@ -48,6 +48,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
+import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Future;
@@ -161,6 +162,13 @@ public class HeartbeatService {
heartbeatReq.setSpaceQuotaUsage(configManager.getClusterQuotaManager().getSpaceQuotaUsage());
}
+ final Map<Integer, Set<Integer>> topologyMap =
+ configManager.getLoadManager().getLoadCache().getTopology();
+ if (topologyMap != null) {
+ heartbeatReq.setTopology(topologyMap);
+
heartbeatReq.setDataNodes(configManager.getNodeManager().getRegisteredDataNodeLocations());
+ }
+
/* Update heartbeat counter */
heartbeatCounter.getAndIncrement();
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
new file mode 100644
index 00000000000..7aa3ac91c59
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/service/TopologyService.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.manager.load.service;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TNodeLocations;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResp;
+import org.apache.iotdb.common.rpc.thrift.TTestConnectionResult;
+import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.confignode.client.async.CnToDnAsyncRequestType;
+import
org.apache.iotdb.confignode.client.async.CnToDnInternalServiceAsyncRequestManager;
+import
org.apache.iotdb.confignode.client.async.handlers.DataNodeAsyncRequestContext;
+import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
+import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
+import org.apache.iotdb.confignode.manager.IManager;
+import org.apache.iotdb.confignode.manager.load.cache.AbstractHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.IFailureDetector;
+import org.apache.iotdb.confignode.manager.load.cache.detector.FixedDetector;
+import
org.apache.iotdb.confignode.manager.load.cache.detector.PhiAccrualDetector;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeHeartbeatSample;
+import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
+import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
+import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
+
+import org.apache.ratis.util.AwaitForSignal;
+import org.apache.tsfile.utils.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+public class TopologyService implements Runnable, IClusterStatusSubscriber {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TopologyService.class);
+ private static final long PROBING_INTERVAL_MS = 5_000L;
+ private static final long PROBING_TIMEOUT_MS = PROBING_INTERVAL_MS;
+ private static final int SAMPLING_WINDOW_SIZE = 100;
+
+ private final ExecutorService topologyThread =
+ IoTDBThreadPoolFactory.newSingleThreadExecutor(
+ ThreadName.CONFIG_NODE_TOPOLOGY_SERVICE.getName());
+
+ private Future<?> future = null;
+ private final Consumer<Map<Integer, Set<Integer>>> topologyChangeListener;
+
+ private final AwaitForSignal awaitForSignal;
+ private final IManager configManager;
+
+ private final AtomicBoolean shouldRun;
+
+ /* (fromDataNodeId, toDataNodeId) -> heartbeat history */
+ private final Map<Pair<Integer, Integer>, List<AbstractHeartbeatSample>>
heartbeats;
+ private final List<Integer> startingDataNodes = new CopyOnWriteArrayList<>();
+
+ private final IFailureDetector failureDetector;
+ private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
+
+ public TopologyService(
+ IManager configManager, Consumer<Map<Integer, Set<Integer>>>
topologyChangeListener) {
+ this.configManager = configManager;
+ this.topologyChangeListener = topologyChangeListener;
+ this.heartbeats = new ConcurrentHashMap<>();
+ this.shouldRun = new AtomicBoolean(false);
+ this.awaitForSignal = new AwaitForSignal(this.getClass().getSimpleName());
+
+ // here we use the same failure
+ switch (CONF.getFailureDetector()) {
+ case IFailureDetector.PHI_ACCRUAL_DETECTOR:
+ this.failureDetector =
+ new PhiAccrualDetector(
+ CONF.getFailureDetectorPhiThreshold(),
+ CONF.getFailureDetectorPhiAcceptablePauseInMs() * 1000_000L,
+ CONF.getHeartbeatIntervalInMs() * 200_000L,
+ IFailureDetector.PHI_COLD_START_THRESHOLD,
+ new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs()
* 1000_000L));
+ break;
+ case IFailureDetector.FIXED_DETECTOR:
+ default:
+ this.failureDetector =
+ new FixedDetector(CONF.getFailureDetectorFixedThresholdInMs() *
1000_000L);
+ }
+ }
+
+ public synchronized void startTopologyService() {
+ if (future == null) {
+ future = this.topologyThread.submit(this);
+ }
+ shouldRun.set(true);
+ LOGGER.info("Topology Probing has started successfully");
+ }
+
+ public synchronized void stopTopologyService() {
+ shouldRun.set(false);
+ future.cancel(true);
+ future = null;
+ heartbeats.clear();
+ LOGGER.info("Topology Probing has stopped successfully");
+ }
+
+ /**
+ * Schedule the {@link #topologyProbing} task either: 1. every
PROBING_INTERVAL_MS interval. 2.
+ * Manually triggered by outside events (node restart / register, etc.).
+ */
+ private boolean mayWait() {
+ try {
+ this.awaitForSignal.await(PROBING_INTERVAL_MS, TimeUnit.MILLISECONDS);
+ return true;
+ } catch (InterruptedException e) {
+ return false;
+ }
+ }
+
+ @Override
+ public void run() {
+ while (shouldRun.get() && mayWait()) {
+ topologyProbing();
+ }
+ }
+
+ private synchronized void topologyProbing() {
+ // 1. get the latest datanode list
+ final List<TDataNodeLocation> dataNodeLocations = new ArrayList<>();
+ final Set<Integer> dataNodeIds = new HashSet<>();
+ for (final TDataNodeConfiguration dataNodeConf :
+ configManager.getNodeManager().getRegisteredDataNodes()) {
+ final TDataNodeLocation location = dataNodeConf.getLocation();
+ if (startingDataNodes.contains(location.getDataNodeId())) {
+ continue; // we shall wait for internal endpoint to be ready
+ }
+ dataNodeLocations.add(location);
+ dataNodeIds.add(location.getDataNodeId());
+ }
+
+ // 2. send the verify connection RPC to all datanodes
+ final TNodeLocations nodeLocations = new TNodeLocations();
+ nodeLocations.setDataNodeLocations(dataNodeLocations);
+ nodeLocations.setConfigNodeLocations(Collections.emptyList());
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodes().stream()
+ .map(TDataNodeConfiguration::getLocation)
+ .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId,
location -> location));
+ final DataNodeAsyncRequestContext<TNodeLocations, TTestConnectionResp>
+ dataNodeAsyncRequestContext =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.SUBMIT_TEST_DN_INTERNAL_CONNECTION_TASK,
+ nodeLocations,
+ dataNodeLocationMap);
+ CnToDnInternalServiceAsyncRequestManager.getInstance()
+ .sendAsyncRequestWithTimeoutInMs(dataNodeAsyncRequestContext,
PROBING_TIMEOUT_MS);
+ final List<TTestConnectionResult> results = new ArrayList<>();
+ dataNodeAsyncRequestContext
+ .getResponseMap()
+ .forEach(
+ (nodeId, resp) -> {
+ if (resp.isSetResultList()) {
+ results.addAll(resp.getResultList());
+ }
+ });
+
+ // 3. collect results and update the heartbeat timestamps
+ for (final TTestConnectionResult result : results) {
+ final int fromDataNodeId =
+ Optional.ofNullable(result.getSender().getDataNodeLocation())
+ .map(TDataNodeLocation::getDataNodeId)
+ .orElse(-1);
+ final int toDataNodeId = result.getServiceProvider().getNodeId();
+ if (result.isSuccess()
+ && dataNodeIds.contains(fromDataNodeId)
+ && dataNodeIds.contains(toDataNodeId)) {
+ // testAllDataNodeConnectionWithTimeout ensures the heartbeats are
Dn-Dn internally. Here we
+ // just double-check.
+ final List<AbstractHeartbeatSample> heartbeatHistory =
+ heartbeats.computeIfAbsent(
+ new Pair<>(fromDataNodeId, toDataNodeId), p -> new
LinkedList<>());
+ heartbeatHistory.add(new NodeHeartbeatSample(NodeStatus.Running));
+ if (heartbeatHistory.size() > SAMPLING_WINDOW_SIZE) {
+ heartbeatHistory.remove(0);
+ }
+ }
+ }
+
+ // 4. use failure detector to identify potential network partitions
+ final Map<Integer, Set<Integer>> latestTopology =
+ dataNodeLocations.stream()
+ .collect(Collectors.toMap(TDataNodeLocation::getDataNodeId, k ->
new HashSet<>()));
+ final Map<Integer, Set<Integer>> partitioned = new HashMap<>();
+ for (final Map.Entry<Pair<Integer, Integer>,
List<AbstractHeartbeatSample>> entry :
+ heartbeats.entrySet()) {
+ final int fromId = entry.getKey().getLeft();
+ final int toId = entry.getKey().getRight();
+ if (!entry.getValue().isEmpty()
+ && !failureDetector.isAvailable(entry.getKey(), entry.getValue())) {
+ LOGGER.debug("Connection from DataNode {} to DataNode {} is broken",
fromId, toId);
+ partitioned.computeIfAbsent(fromId, id -> new HashSet<>()).add(toId);
+ } else {
+ latestTopology.get(fromId).add(toId);
+ }
+ }
+
+ if (!partitioned.isEmpty()) {
+ logAsymmetricPartition(partitioned);
+ }
+
+ // 5. notify the listeners on topology change
+ if (shouldRun.get()) {
+ topologyChangeListener.accept(latestTopology);
+ }
+ }
+
+ private void logAsymmetricPartition(final Map<Integer, Set<Integer>>
partitioned) {
+ for (final int fromId : partitioned.keySet()) {
+ for (final int toId : partitioned.get(fromId)) {
+ if (partitioned.get(toId) == null ||
!partitioned.get(toId).contains(fromId)) {
+ LOGGER.warn("[Topology] Asymmetric network partition from {} to {}",
fromId, toId);
+ }
+ }
+ }
+ }
+
+ /** We only listen to datanode remove / restart / register events */
+ @Override
+ public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
+ final Set<Integer> datanodeIds =
+
configManager.getNodeManager().getRegisteredDataNodeLocations().keySet();
+ final Map<Integer, Pair<NodeStatistics, NodeStatistics>> changes =
+ event.getDifferentNodeStatisticsMap();
+ for (final Map.Entry<Integer, Pair<NodeStatistics, NodeStatistics>> entry :
+ changes.entrySet()) {
+ final Integer nodeId = entry.getKey();
+ final Pair<NodeStatistics, NodeStatistics> changeEvent =
entry.getValue();
+ if (!datanodeIds.contains(nodeId)) {
+ continue;
+ }
+ if (changeEvent.getLeft() == null) {
+ // if a new datanode registered, DO NOT trigger probing immediately
+ startingDataNodes.add(nodeId);
+ continue;
+ } else {
+ startingDataNodes.remove(nodeId);
+ }
+
+ if (changeEvent.getRight() == null) {
+ // datanode removed from cluster, clean up probing history
+ final Set<Pair<Integer, Integer>> toRemove =
+ heartbeats.keySet().stream()
+ .filter(
+ pair ->
+ Objects.equals(pair.getLeft(), nodeId)
+ || Objects.equals(pair.getRight(), nodeId))
+ .collect(Collectors.toSet());
+ toRemove.forEach(heartbeats::remove);
+ } else {
+ // we only trigger probing immediately if node comes around from
UNKNOWN to RUNNING
+ if (NodeStatus.Unknown.equals(changeEvent.getLeft().getStatus())
+ && NodeStatus.Running.equals(changeEvent.getRight().getStatus())) {
+ awaitForSignal.signal();
+ }
+ }
+ }
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
index 6c1cf3c3fad..83f1dc1bc83 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/IClusterStatusSubscriber.java
@@ -26,13 +26,13 @@ public interface IClusterStatusSubscriber {
@Subscribe
@AllowConcurrentEvents
- void onNodeStatisticsChanged(NodeStatisticsChangeEvent event);
+ default void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {}
@Subscribe
@AllowConcurrentEvents
- void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent event);
+ default void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {}
@Subscribe
@AllowConcurrentEvents
- void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent
event);
+ default void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
index 11da0dc2998..caf3db4806d 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/subscriber/NodeStatisticsChangeEvent.java
@@ -23,6 +23,7 @@ import
org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics;
import org.apache.tsfile.utils.Pair;
+import java.util.Collections;
import java.util.Map;
/** NodeStatisticsChangeEvent represents the change of Node statistics. */
@@ -37,6 +38,6 @@ public class NodeStatisticsChangeEvent {
}
public Map<Integer, Pair<NodeStatistics, NodeStatistics>>
getDifferentNodeStatisticsMap() {
- return differentNodeStatisticsMap;
+ return Collections.unmodifiableMap(differentNodeStatisticsMap);
}
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
index 9121ba3caa4..01ede42afcd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeLeaderChangeHandler.java
@@ -27,8 +27,6 @@ import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.cache.consensus.ConsensusGroupStatistics;
import
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
-import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
-import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import org.apache.tsfile.utils.Pair;
@@ -59,16 +57,6 @@ public class PipeLeaderChangeHandler implements
IClusterStatusSubscriber {
onConsensusGroupStatisticsChanged(new
ConsensusGroupStatisticsChangeEvent(virtualChangeMap));
}
- @Override
- public void onNodeStatisticsChanged(NodeStatisticsChangeEvent event) {
- // Do nothing
- }
-
- @Override
- public void onRegionGroupStatisticsChanged(RegionGroupStatisticsChangeEvent
event) {
- // Do nothing
- }
-
@Override
public void
onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) {
// If no pipe tasks, return
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
index 5b6369a2c3c..1e8f32eda76 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/coordinator/runtime/PipeRuntimeCoordinator.java
@@ -24,8 +24,6 @@ import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.confignode.manager.ConfigManager;
import
org.apache.iotdb.confignode.manager.load.subscriber.ConsensusGroupStatisticsChangeEvent;
import
org.apache.iotdb.confignode.manager.load.subscriber.IClusterStatusSubscriber;
-import
org.apache.iotdb.confignode.manager.load.subscriber.NodeStatisticsChangeEvent;
-import
org.apache.iotdb.confignode.manager.load.subscriber.RegionGroupStatisticsChangeEvent;
import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeat;
import
org.apache.iotdb.confignode.manager.pipe.coordinator.runtime.heartbeat.PipeHeartbeatScheduler;
@@ -70,16 +68,6 @@ public class PipeRuntimeCoordinator implements
IClusterStatusSubscriber {
pipeLeaderChangeHandler.onConfigRegionGroupLeaderChanged();
}
- @Override
- public void onNodeStatisticsChanged(final NodeStatisticsChangeEvent event) {
- // Do nothing
- }
-
- @Override
- public void onRegionGroupStatisticsChanged(final
RegionGroupStatisticsChangeEvent event) {
- // Do nothing
- }
-
@Override
public synchronized void onConsensusGroupStatisticsChanged(
final ConsensusGroupStatisticsChangeEvent event) {
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
index 69f073aad8b..306146f6b90 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/RegionGroupCacheTest.java
@@ -18,6 +18,8 @@
*/
package org.apache.iotdb.confignode.manager.load.cache;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.confignode.manager.load.cache.region.RegionGroupCache;
import
org.apache.iotdb.confignode.manager.load.cache.region.RegionHeartbeatSample;
@@ -32,12 +34,15 @@ import java.util.stream.Stream;
public class RegionGroupCacheTest {
private static final String DATABASE = "root.db";
+ private static final TConsensusGroupId GROUP_ID =
+ new TConsensusGroupId(TConsensusGroupType.DataRegion, 1);
@Test
public void getRegionStatusTest() {
long currentTime = System.nanoTime();
RegionGroupCache regionGroupCache =
- new RegionGroupCache(DATABASE, Stream.of(0, 1, 2, 3,
4).collect(Collectors.toSet()), false);
+ new RegionGroupCache(
+ DATABASE, GROUP_ID, Stream.of(0, 1, 2, 3,
4).collect(Collectors.toSet()), false);
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
@@ -66,7 +71,8 @@ public class RegionGroupCacheTest {
public void weakConsistencyRegionGroupStatusTest() {
long currentTime = System.nanoTime();
RegionGroupCache regionGroupCache =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()), false);
+ new RegionGroupCache(
+ DATABASE, GROUP_ID, Stream.of(0, 1,
2).collect(Collectors.toSet()), false);
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
@@ -102,7 +108,8 @@ public class RegionGroupCacheTest {
public void strongConsistencyRegionGroupStatusTest() {
long currentTime = System.nanoTime();
RegionGroupCache regionGroupCache =
- new RegionGroupCache(DATABASE, Stream.of(0, 1,
2).collect(Collectors.toSet()), true);
+ new RegionGroupCache(
+ DATABASE, GROUP_ID, Stream.of(0, 1,
2).collect(Collectors.toSet()), true);
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
@@ -137,7 +144,7 @@ public class RegionGroupCacheTest {
public void migrateRegionRegionGroupStatusTest() {
long currentTime = System.nanoTime();
RegionGroupCache regionGroupCache =
- new RegionGroupCache(DATABASE,
Stream.of(0).collect(Collectors.toSet()), true);
+ new RegionGroupCache(DATABASE, GROUP_ID,
Stream.of(0).collect(Collectors.toSet()), true);
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.updateCurrentStatistics();
@@ -145,7 +152,7 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Running,
regionGroupCache.getCurrentStatistics().getRegionGroupStatus());
regionGroupCache =
- new RegionGroupCache(DATABASE, Stream.of(0,
1).collect(Collectors.toSet()), true);
+ new RegionGroupCache(DATABASE, GROUP_ID, Stream.of(0,
1).collect(Collectors.toSet()), true);
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
@@ -155,7 +162,7 @@ public class RegionGroupCacheTest {
RegionGroupStatus.Running,
regionGroupCache.getCurrentStatistics().getRegionGroupStatus());
regionGroupCache =
- new RegionGroupCache(DATABASE, Stream.of(0,
1).collect(Collectors.toSet()), true);
+ new RegionGroupCache(DATABASE, GROUP_ID, Stream.of(0,
1).collect(Collectors.toSet()), true);
regionGroupCache.cacheHeartbeatSample(
0, new RegionHeartbeatSample(currentTime, RegionStatus.Running));
regionGroupCache.cacheHeartbeatSample(
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
index a56a7166646..60af7142a6c 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/cache/detector/DetectorTest.java
@@ -33,6 +33,7 @@ import java.util.List;
public class DetectorTest {
final long sec = 1_000_000_000L;
+ final long id = 114514L; // ;)
final FixedDetector fixedDetector = new FixedDetector(20 * sec);
final PhiAccrualDetector phiAccrualDetector =
new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 0,
fixedDetector);
@@ -53,13 +54,13 @@ public class DetectorTest {
final long lastHeartbeatTs = System.nanoTime() - 21 * sec;
final List<AbstractHeartbeatSample> history =
Collections.singletonList(new NodeHeartbeatSample(lastHeartbeatTs,
NodeStatus.Running));
- Assert.assertFalse(fixedDetector.isAvailable(history));
+ Assert.assertFalse(fixedDetector.isAvailable(id, history));
final long lastAvailableHeartbeat = System.nanoTime() - 18 * sec;
final List<AbstractHeartbeatSample> history2 =
Collections.singletonList(
new NodeHeartbeatSample(lastAvailableHeartbeat,
NodeStatus.Running));
- Assert.assertTrue(fixedDetector.isAvailable(history2));
+ Assert.assertTrue(fixedDetector.isAvailable(id, history2));
}
@Test
@@ -107,8 +108,8 @@ public class DetectorTest {
public void testComparisonQuickFailureDetection() {
long[] interval = new long[] {sec, sec, sec};
List<AbstractHeartbeatSample> history = fromInterval(interval, 13 * sec);
- Assert.assertTrue(fixedDetector.isAvailable(history));
- Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+ Assert.assertTrue(fixedDetector.isAvailable(id, history));
+ Assert.assertFalse(phiAccrualDetector.isAvailable(id, history));
}
/**
@@ -121,8 +122,8 @@ public class DetectorTest {
long[] interval = new long[] {sec, sec, sec};
long gcPause = 15 * sec;
List<AbstractHeartbeatSample> history = fromInterval(interval, gcPause + 2
* sec);
- Assert.assertTrue(fixedDetector.isAvailable(history));
- Assert.assertFalse(phiAccrualDetector.isAvailable(history));
+ Assert.assertTrue(fixedDetector.isAvailable(id, history));
+ Assert.assertFalse(phiAccrualDetector.isAvailable(id, history));
}
/**
@@ -148,8 +149,8 @@ public class DetectorTest {
sec
};
List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
- Assert.assertFalse(fixedDetector.isAvailable(history));
- Assert.assertTrue(phiAccrualDetector.isAvailable(history));
+ Assert.assertFalse(fixedDetector.isAvailable(id, history));
+ Assert.assertTrue(phiAccrualDetector.isAvailable(id, history));
}
/**
@@ -161,8 +162,8 @@ public class DetectorTest {
new PhiAccrualDetector(30, 10 * sec, (long) (0.2 * sec), 60,
fixedDetector);
long[] interval = new long[] {sec, sec, sec};
List<AbstractHeartbeatSample> history = fromInterval(interval, 21 * sec);
- Assert.assertFalse(fixedDetector.isAvailable(history));
- Assert.assertFalse(coldStartPhi.isAvailable(history));
+ Assert.assertFalse(fixedDetector.isAvailable(id, history));
+ Assert.assertFalse(coldStartPhi.isAvailable(id, history));
}
private List<AbstractHeartbeatSample> fromInterval(long[] interval, long
timeElapsed) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
index 9672d431c38..04dfb20879d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/AsyncTSStatusRPCHandler.java
@@ -33,6 +33,7 @@ import java.util.concurrent.CountDownLatch;
/** General RPC handler for TSStatus response type. */
public class AsyncTSStatusRPCHandler extends
DataNodeAsyncRequestRPCHandler<TSStatus> {
+ private final boolean keepSilent;
private static final Logger LOGGER =
LoggerFactory.getLogger(AsyncTSStatusRPCHandler.class);
public AsyncTSStatusRPCHandler(
@@ -41,8 +42,10 @@ public class AsyncTSStatusRPCHandler extends
DataNodeAsyncRequestRPCHandler<TSSt
TDataNodeLocation targetDataNode,
Map<Integer, TDataNodeLocation> dataNodeLocationMap,
Map<Integer, TSStatus> responseMap,
- CountDownLatch countDownLatch) {
+ CountDownLatch countDownLatch,
+ boolean keepSilent) {
super(requestType, requestId, targetDataNode, dataNodeLocationMap,
responseMap, countDownLatch);
+ this.keepSilent = keepSilent;
}
@Override
@@ -53,13 +56,17 @@ public class AsyncTSStatusRPCHandler extends
DataNodeAsyncRequestRPCHandler<TSSt
if (response.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
// Remove only if success
nodeLocationMap.remove(requestId);
- LOGGER.info("Successfully {} on DataNode: {}", requestType,
formattedTargetLocation);
+ if (!keepSilent) {
+ LOGGER.info("Successfully {} on DataNode: {}", requestType,
formattedTargetLocation);
+ }
} else {
- LOGGER.error(
- "Failed to {} on DataNode: {}, response: {}",
- requestType,
- formattedTargetLocation,
- response);
+ if (!keepSilent) {
+ LOGGER.error(
+ "Failed to {} on DataNode: {}, response: {}",
+ requestType,
+ formattedTargetLocation,
+ response);
+ }
}
// Always CountDown
@@ -75,7 +82,9 @@ public class AsyncTSStatusRPCHandler extends
DataNodeAsyncRequestRPCHandler<TSSt
+ formattedTargetLocation
+ ", exception: "
+ e.getMessage();
- LOGGER.error(errorMsg);
+ if (!keepSilent) {
+ LOGGER.error(errorMsg);
+ }
responseMap.put(
requestId,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
index a15e4a0b10f..5976deb6d50 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeAsyncRequestRPCHandler.java
@@ -68,18 +68,24 @@ public abstract class
DataNodeAsyncRequestRPCHandler<Response>
final Map<Integer, TDataNodeLocation> nodeLocationMap =
context.getNodeLocationMap();
final Map<Integer, ?> responseMap = context.getResponseMap();
final CountDownLatch countDownLatch = context.getCountDownLatch();
+ final boolean keepSilent;
switch (requestType) {
case TEST_CONNECTION:
+ keepSilent = true;
+ break;
case UPDATE_ATTRIBUTE:
- return new AsyncTSStatusRPCHandler(
- requestType,
- requestId,
- targetDataNode,
- nodeLocationMap,
- (Map<Integer, TSStatus>) responseMap,
- countDownLatch);
+ keepSilent = false;
+ break;
default:
throw new UnsupportedOperationException("request type is not
supported: " + requestType);
}
+ return new AsyncTSStatusRPCHandler(
+ requestType,
+ requestId,
+ targetDataNode,
+ nodeLocationMap,
+ (Map<Integer, TSStatus>) responseMap,
+ countDownLatch,
+ keepSilent);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
new file mode 100644
index 00000000000..d0ba1ba389f
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/dn/DataNodeIntraHeartbeatManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.protocol.client.dn;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.commons.client.request.AsyncRequestContext;
+import org.apache.iotdb.commons.client.request.AsyncRequestRPCHandler;
+import
org.apache.iotdb.commons.client.request.DataNodeIntraHeartbeatRequestManager;
+
+public class DataNodeIntraHeartbeatManager
+ extends DataNodeIntraHeartbeatRequestManager<DnToDnRequestType> {
+
+ @Override
+ protected void initActionMapBuilder() {
+ actionMapBuilder.put(
+ DnToDnRequestType.TEST_CONNECTION,
+ (req, client, handler) ->
client.testConnectionEmptyRPC((AsyncTSStatusRPCHandler) handler));
+ }
+
+ @Override
+ protected AsyncRequestRPCHandler<?, DnToDnRequestType, TDataNodeLocation>
buildHandler(
+ AsyncRequestContext<?, ?, DnToDnRequestType, TDataNodeLocation>
requestContext,
+ int requestId,
+ TDataNodeLocation targetNode) {
+ return DataNodeAsyncRequestRPCHandler.createAsyncRPCHandler(
+ requestContext, requestId, targetNode);
+ }
+
+ private static class ClientPoolHolder {
+
+ private static final DataNodeIntraHeartbeatManager INSTANCE =
+ new DataNodeIntraHeartbeatManager();
+
+ private ClientPoolHolder() {
+ // Empty constructor
+ }
+ }
+
+ public static DataNodeIntraHeartbeatManager getInstance() {
+ return DataNodeIntraHeartbeatManager.ClientPoolHolder.INSTANCE;
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b2d380c35ca..47c84e5857b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -87,6 +87,7 @@ import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import
org.apache.iotdb.db.protocol.client.cn.DnToCnInternalServiceAsyncRequestManager;
import org.apache.iotdb.db.protocol.client.cn.DnToCnRequestType;
import
org.apache.iotdb.db.protocol.client.dn.DataNodeExternalServiceAsyncRequestManager;
+import org.apache.iotdb.db.protocol.client.dn.DataNodeIntraHeartbeatManager;
import
org.apache.iotdb.db.protocol.client.dn.DataNodeMPPServiceAsyncRequestManager;
import
org.apache.iotdb.db.protocol.client.dn.DnToDnInternalServiceAsyncRequestManager;
import org.apache.iotdb.db.protocol.client.dn.DnToDnRequestType;
@@ -104,6 +105,7 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceManage
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.ISchemaSource;
import
org.apache.iotdb.db.queryengine.execution.operator.schema.source.SchemaSourceFactory;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.ClusterPartitionFetcher;
import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
@@ -350,6 +352,8 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
private final DataNodeThrottleQuotaManager throttleQuotaManager =
DataNodeThrottleQuotaManager.getInstance();
+ private final ClusterTopology clusterTopology =
ClusterTopology.getInstance();
+
private final CommonConfig commonConfig =
CommonDescriptor.getInstance().getConfig();
private static final String SYSTEM = "system";
@@ -1763,6 +1767,14 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
.collect(Collectors.toList()));
}
+ @Override
+ public TTestConnectionResp submitInternalTestConnectionTask(TNodeLocations
nodeLocations)
+ throws TException {
+ return new TTestConnectionResp(
+ new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()),
+
testAllDataNodeConnectionInHeartbeatChannel(nodeLocations.getDataNodeLocations()));
+ }
+
private static <Location, RequestType> List<TTestConnectionResult>
testConnections(
final List<Location> nodeLocations,
final Function<Location, Integer> getId,
@@ -1790,6 +1802,18 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
DnToCnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequest(handler));
}
+ private List<TTestConnectionResult>
testAllDataNodeConnectionInHeartbeatChannel(
+ List<TDataNodeLocation> dataNodeLocations) {
+ return testConnections(
+ dataNodeLocations,
+ TDataNodeLocation::getDataNodeId,
+ TDataNodeLocation::getInternalEndPoint,
+ TServiceType.DataNodeInternalService,
+ DnToDnRequestType.TEST_CONNECTION,
+ (AsyncRequestContext<Object, TSStatus, DnToDnRequestType,
TDataNodeLocation> handler) ->
+
DataNodeIntraHeartbeatManager.getInstance().sendAsyncRequest(handler, 1, null,
true));
+ }
+
private List<TTestConnectionResult> testAllDataNodeInternalServiceConnection(
List<TDataNodeLocation> dataNodeLocations) {
return testConnections(
@@ -1921,6 +1945,10 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
}
}
+ if (req.isSetTopology() && req.isSetDataNodes()) {
+ clusterTopology.updateTopology(req.getDataNodes(), req.getTopology());
+ }
+
return resp;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
new file mode 100644
index 00000000000..2357a5f50f1
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/ClusterTopology.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+public class ClusterTopology {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterTopology.class);
+ private final Integer myself;
+ private final AtomicReference<Map<Integer, TDataNodeLocation>> dataNodes;
+ private final AtomicReference<Map<Integer, Set<Integer>>> topologyMap;
+ private final AtomicBoolean isPartitioned = new AtomicBoolean();
+
+ public static ClusterTopology getInstance() {
+ return ClusterTopologyHolder.INSTANCE;
+ }
+
+ public TRegionReplicaSet getValidatedReplicaSet(TRegionReplicaSet origin) {
+ if (!isPartitioned.get() || origin == null) {
+ return origin;
+ }
+ final Set<Integer> reachableToMyself =
+ Collections.unmodifiableSet(topologyMap.get().get(myself));
+ final List<TDataNodeLocation> locations = new ArrayList<>();
+ for (final TDataNodeLocation location : origin.getDataNodeLocations()) {
+ if (reachableToMyself.contains(location.getDataNodeId())) {
+ locations.add(location);
+ }
+ }
+ return new TRegionReplicaSet(origin.getRegionId(), locations);
+ }
+
+ public <T> Set<Map.Entry<TRegionReplicaSet, T>> filterReachableCandidates(
+ Set<Map.Entry<TRegionReplicaSet, T>> input) {
+ if (!isPartitioned.get()) {
+ return input;
+ }
+ final List<TRegionReplicaSet> allSets =
+ input.stream().map(Map.Entry::getKey).collect(Collectors.toList());
+ final List<TRegionReplicaSet> candidates = getReachableCandidates(allSets);
+ final Map<TConsensusGroupId, TRegionReplicaSet> newMap = new HashMap<>();
+ candidates.forEach(set -> newMap.put(set.getRegionId(), set));
+ final Map<TRegionReplicaSet, T> candidateMap = new HashMap<>();
+ for (final Map.Entry<TRegionReplicaSet, T> entry : input) {
+ final TConsensusGroupId gid = entry.getKey().getRegionId();
+ final TRegionReplicaSet replicaSet = newMap.get(gid);
+ if (replicaSet != null) {
+ candidateMap.put(replicaSet, entry.getValue());
+ }
+ }
+ return candidateMap.entrySet();
+ }
+
+ private List<TRegionReplicaSet>
getReachableCandidates(List<TRegionReplicaSet> all) {
+ if (!isPartitioned.get() || all == null || all.isEmpty()) {
+ return all;
+ }
+ final Map<Integer, Set<Integer>> topologyMapCurrent =
+ Collections.unmodifiableMap(this.topologyMap.get());
+
+ // brute-force search to select DataNode candidates that can communicate
to all
+ // TRegionReplicaSets
+ final List<Integer> dataNodeCandidates = new ArrayList<>();
+ for (final Integer datanode : topologyMapCurrent.keySet()) {
+ boolean reachableToAllSets = true;
+ final Set<Integer> datanodeReachableToThis =
topologyMapCurrent.get(datanode);
+ for (final TRegionReplicaSet replicaSet : all) {
+ final List<Integer> replicaNodeLocations =
+ replicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList());
+ replicaNodeLocations.retainAll(datanodeReachableToThis);
+ reachableToAllSets = !replicaNodeLocations.isEmpty();
+ }
+ if (reachableToAllSets) {
+ dataNodeCandidates.add(datanode);
+ }
+ }
+
+ // select TRegionReplicaSet candidates whose DataNode Locations contain at
least one
+ // allReachableDataNodes
+ final List<TRegionReplicaSet> reachableSetCandidates = new ArrayList<>();
+ for (final TRegionReplicaSet replicaSet : all) {
+ final List<Integer> commonLocations =
+ replicaSet.getDataNodeLocations().stream()
+ .map(TDataNodeLocation::getDataNodeId)
+ .collect(Collectors.toList());
+ commonLocations.retainAll(dataNodeCandidates);
+ if (!commonLocations.isEmpty()) {
+ final List<TDataNodeLocation> validLocations =
+
commonLocations.stream().map(dataNodes.get()::get).collect(Collectors.toList());
+ final TRegionReplicaSet validCandidate =
+ new TRegionReplicaSet(replicaSet.getRegionId(), validLocations);
+ reachableSetCandidates.add(validCandidate);
+ }
+ }
+
+ return reachableSetCandidates;
+ }
+
+ public void updateTopology(
+ final Map<Integer, TDataNodeLocation> dataNodes, Map<Integer,
Set<Integer>> latestTopology) {
+ if (!latestTopology.equals(topologyMap.get())) {
+ LOGGER.info("[Topology] latest view from config-node: {}",
latestTopology);
+ }
+ this.dataNodes.set(dataNodes);
+ this.topologyMap.set(latestTopology);
+ if (latestTopology.get(myself) == null ||
latestTopology.get(myself).isEmpty()) {
+ // latest topology doesn't include this node information.
+ // This mostly happens when this node just starts and haven't report
connection details.
+ this.isPartitioned.set(false);
+ } else {
+ this.isPartitioned.set(latestTopology.get(myself).size() !=
latestTopology.keySet().size());
+ }
+ if (isPartitioned.get() && LOGGER.isDebugEnabled()) {
+ final Set<Integer> allDataLocations = new
HashSet<>(latestTopology.keySet());
+ allDataLocations.removeAll(latestTopology.get(myself));
+ final String partitioned =
+ allDataLocations.stream()
+ .collect(
+ StringBuilder::new, (sb, id) -> sb.append(",").append(id),
StringBuilder::append)
+ .toString();
+ LOGGER.debug("This DataNode {} is partitioned with [{}]", myself,
partitioned);
+ }
+ }
+
+ private ClusterTopology() {
+ this.myself =
+
IoTDBDescriptor.getInstance().getConfig().generateLocalDataNodeLocation().getDataNodeId();
+ this.isPartitioned.set(false);
+ this.topologyMap = new AtomicReference<>(Collections.emptyMap());
+ this.dataNodes = new AtomicReference<>(Collections.emptyMap());
+ }
+
+ private static class ClusterTopologyHolder {
+
+ private static final ClusterTopology INSTANCE = new ClusterTopology();
+
+ private ClusterTopologyHolder() {}
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 3ed0ffa7fe4..ee11364947b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -27,9 +27,11 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
import org.apache.iotdb.db.queryengine.plan.expression.Expression;
import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
+import
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -44,6 +46,7 @@ import
org.apache.iotdb.db.queryengine.plan.statement.metadata.ShowTimeSeriesSta
import
org.apache.iotdb.db.queryengine.plan.statement.sys.ExplainAnalyzeStatement;
import org.apache.iotdb.db.queryengine.plan.statement.sys.ShowQueriesStatement;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
@@ -75,6 +78,7 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
// Record FragmentInstances dispatched to same DataNode
private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap;
+ private final ClusterTopology topology = ClusterTopology.getInstance();
public SimpleFragmentParallelPlanner(
SubPlan subPlan, Analysis analysis, MPPQueryContext context) {
@@ -152,6 +156,13 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
// Get the target region for origin PlanFragment, then its instance will
be distributed one
// of them.
TRegionReplicaSet regionReplicaSet =
fragment.getTargetRegionForTreeModel();
+ if (regionReplicaSet != null
+ && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
+ regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet);
+ if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+ throw new
ReplicaSetUnreachableException(fragment.getTargetRegionForTreeModel());
+ }
+ }
// Set ExecutorType and target host for the instance
// We need to store all the replica host in case of the scenario that the
instance need to be
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
index 52d85471590..1e5c0567825 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/WriteFragmentParallelPlanner.java
@@ -19,8 +19,10 @@
package org.apache.iotdb.db.queryengine.plan.planner.distribution;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.partition.StorageExecutor;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
import org.apache.iotdb.db.queryengine.plan.analyze.IAnalysis;
import org.apache.iotdb.db.queryengine.plan.planner.IFragmentParallelPlaner;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
@@ -39,6 +41,7 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
private IAnalysis analysis;
private MPPQueryContext queryContext;
private BiFunction<WritePlanNode, IAnalysis, List<WritePlanNode>>
nodeSplitter;
+ private final ClusterTopology topology = ClusterTopology.getInstance();
public WriteFragmentParallelPlanner(
SubPlan subPlan, IAnalysis analysis, MPPQueryContext queryContext) {
@@ -80,7 +83,9 @@ public class WriteFragmentParallelPlanner implements
IFragmentParallelPlaner {
Long.MAX_VALUE,
queryContext.getSession());
if (split.getRegionReplicaSet() != null) {
- instance.setExecutorAndHost(new
StorageExecutor(split.getRegionReplicaSet()));
+ final TRegionReplicaSet validSet =
+ topology.getValidatedReplicaSet(split.getRegionReplicaSet());
+ instance.setExecutorAndHost(new StorageExecutor(validSet));
}
ret.add(instance);
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
similarity index 55%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
index 648762f7c4b..1e5a0319256 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/ReplicaSetUnreachableException.java
@@ -17,20 +17,21 @@
* under the License.
*/
-package org.apache.iotdb.commons.partition;
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.rpc.TSStatusCode;
-/** The interface is used to indicate where to execute a FragmentInstance */
-public interface ExecutorType {
-
- /** Indicate if ExecutorType is StorageExecutor */
- boolean isStorageExecutor();
-
- TDataNodeLocation getDataNodeLocation();
-
- default TRegionReplicaSet getRegionReplicaSet() {
- throw new UnsupportedOperationException(getClass().getName());
+/**
+ * When ALL DataNodeLocations in a QUERY-typed {@link
+ * org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance} are
unreachable, possibly due
+ * to network partition issues, this exception will be thrown and this query
will fail.
+ */
+public class ReplicaSetUnreachableException extends IoTDBRuntimeException {
+ public ReplicaSetUnreachableException(TRegionReplicaSet replicaSet) {
+ super(
+ "All replica cannot be reached:" + replicaSet.toString(),
+ TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
}
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
similarity index 52%
copy from
iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
copy to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
index 648762f7c4b..15b6c640aa0 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/exceptions/RootFIPlacementException.java
@@ -17,20 +17,23 @@
* under the License.
*/
-package org.apache.iotdb.commons.partition;
+package org.apache.iotdb.db.queryengine.plan.planner.exceptions;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.rpc.TSStatusCode;
-/** The interface is used to indicate where to execute a FragmentInstance */
-public interface ExecutorType {
+import java.util.Collection;
- /** Indicate if ExecutorType is StorageExecutor */
- boolean isStorageExecutor();
-
- TDataNodeLocation getDataNodeLocation();
-
- default TRegionReplicaSet getRegionReplicaSet() {
- throw new UnsupportedOperationException(getClass().getName());
+/**
+ * During planning phase of Query, if there exists no datanode that can be
served as the role of
+ * RootFragmentInstance placement, that is, no datanode can reach to all
replica-sets possibly due
+ * to network partition issues, this exception will be thrown and this query
will fail.
+ */
+public class RootFIPlacementException extends IoTDBRuntimeException {
+ public RootFIPlacementException(Collection<TRegionReplicaSet> replicaSets) {
+ super(
+ "root FragmentInstance placement error: " + replicaSets.toString(),
+ TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
index 7a5afed6d8b..69d2a77fe3e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/FragmentInstance.java
@@ -142,7 +142,7 @@ public class FragmentInstance implements IConsensusRequest {
return;
}
this.executorType = executorType;
- this.hostDataNode = executorType.getDataNodeLocation();
+ this.hostDataNode = executorType.getDataNodeLocation().orElse(null);
}
// Although the HostDataNode is set in method setDataRegionAndHost(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 0baefe039b5..7d4c752d0ff 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -30,8 +30,10 @@ import org.apache.iotdb.commons.utils.TimePartitionUtils;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
import org.apache.iotdb.db.queryengine.plan.planner.TableOperatorGenerator;
import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
+import
org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
@@ -125,6 +127,7 @@ public class TableDistributedPlanGenerator
private final SymbolAllocator symbolAllocator;
private final Map<PlanNodeId, OrderingScheme> nodeOrderingMap = new
HashMap<>();
private final DataNodeLocationSupplierFactory.DataNodeLocationSupplier
dataNodeLocationSupplier;
+ private final ClusterTopology topology = ClusterTopology.getInstance();
public TableDistributedPlanGenerator(
final MPPQueryContext queryContext,
@@ -520,7 +523,7 @@ public class TableDistributedPlanGenerator
TRegionReplicaSet mostUsedDataRegion = null;
int maxDeviceEntrySizeOfTableScan = 0;
for (final Map.Entry<TRegionReplicaSet, DeviceTableScanNode> entry :
- tableScanNodeMap.entrySet()) {
+ topology.filterReachableCandidates(tableScanNodeMap.entrySet())) {
final DeviceTableScanNode subDeviceTableScanNode = entry.getValue();
resultTableScanNodeList.add(subDeviceTableScanNode);
@@ -530,6 +533,9 @@ public class TableDistributedPlanGenerator
maxDeviceEntrySizeOfTableScan =
subDeviceTableScanNode.getDeviceEntries().size();
}
}
+ if (mostUsedDataRegion == null) {
+ throw new RootFIPlacementException(tableScanNodeMap.keySet());
+ }
context.mostUsedRegion = mostUsedDataRegion;
if (!context.hasSortProperty) {
@@ -636,7 +642,7 @@ public class TableDistributedPlanGenerator
for (Map.Entry<
TRegionReplicaSet,
Pair<TreeAlignedDeviceViewScanNode,
TreeNonAlignedDeviceViewScanNode>>
- entry : tableScanNodeMap.entrySet()) {
+ entry :
topology.filterReachableCandidates(tableScanNodeMap.entrySet())) {
TRegionReplicaSet regionReplicaSet = entry.getKey();
Pair<TreeAlignedDeviceViewScanNode, TreeNonAlignedDeviceViewScanNode>
pair = entry.getValue();
int currentDeviceEntrySize = 0;
@@ -656,6 +662,9 @@ public class TableDistributedPlanGenerator
maxDeviceEntrySizeOfTableScan = currentDeviceEntrySize;
}
}
+ if (mostUsedDataRegion == null) {
+ throw new RootFIPlacementException(tableScanNodeMap.keySet());
+ }
context.mostUsedRegion = mostUsedDataRegion;
if (!context.hasSortProperty) {
@@ -813,7 +822,8 @@ public class TableDistributedPlanGenerator
List<PlanNode> resultTableScanNodeList = new ArrayList<>();
TRegionReplicaSet mostUsedDataRegion = null;
int maxDeviceEntrySizeOfTableScan = 0;
- for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry :
regionNodeMap.entrySet()) {
+ for (Map.Entry<TRegionReplicaSet, AggregationTableScanNode> entry :
+ topology.filterReachableCandidates(regionNodeMap.entrySet())) {
DeviceTableScanNode subDeviceTableScanNode = entry.getValue();
resultTableScanNodeList.add(subDeviceTableScanNode);
@@ -823,6 +833,9 @@ public class TableDistributedPlanGenerator
maxDeviceEntrySizeOfTableScan =
subDeviceTableScanNode.getDeviceEntries().size();
}
}
+ if (mostUsedDataRegion == null) {
+ throw new RootFIPlacementException(regionNodeMap.keySet());
+ }
context.mostUsedRegion = mostUsedDataRegion;
if (context.hasSortProperty) {
@@ -1324,6 +1337,9 @@ public class TableDistributedPlanGenerator
maxDeviceEntrySizeOfTableScan =
subTableDeviceFetchNode.getDeviceIdList().size();
}
}
+ if (mostUsedSchemaRegion == null) {
+ throw new RootFIPlacementException(tableDeviceFetchMap.keySet());
+ }
context.mostUsedRegion = mostUsedSchemaRegion;
return res;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
index 8141acbe3a3..2d152dc49f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelQueryFragmentPlanner.java
@@ -29,8 +29,10 @@ import
org.apache.iotdb.db.queryengine.common.DataNodeEndPoints;
import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
import
org.apache.iotdb.db.queryengine.execution.exchange.sink.DownStreamChannelLocation;
+import org.apache.iotdb.db.queryengine.plan.ClusterTopology;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import
org.apache.iotdb.db.queryengine.plan.planner.distribution.NodeDistribution;
+import
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
import org.apache.iotdb.db.queryengine.plan.planner.plan.PlanFragment;
import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
@@ -43,6 +45,7 @@ import
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.CountDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ShowDevice;
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Statement;
+import org.apache.commons.collections4.CollectionUtils;
import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,6 +77,7 @@ public class TableModelQueryFragmentPlanner {
// Record FragmentInstances dispatched to same DataNode
private final Map<TDataNodeLocation, List<FragmentInstance>> dataNodeFIMap =
new HashMap<>();
+ private final ClusterTopology topology = ClusterTopology.getInstance();
private final Map<PlanNodeId, NodeDistribution> nodeDistributionMap;
@@ -124,6 +128,14 @@ public class TableModelQueryFragmentPlanner {
// Get the target region for origin PlanFragment, then its instance will
be distributed one
// of them.
TRegionReplicaSet regionReplicaSet =
fragment.getTargetRegionForTableModel(nodeDistributionMap);
+ if (regionReplicaSet != null
+ && !CollectionUtils.isEmpty(regionReplicaSet.getDataNodeLocations())) {
+ regionReplicaSet = topology.getValidatedReplicaSet(regionReplicaSet);
+ if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+ throw new ReplicaSetUnreachableException(
+ fragment.getTargetRegionForTableModel(nodeDistributionMap));
+ }
+ }
// Set ExecutorType and target host for the instance,
// We need to store all the replica host in case of the scenario that the
instance need to be
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index db0c270d176..5aef2e3176c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -204,6 +204,11 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
List<FragmentInstance> localInstances = new ArrayList<>();
List<FragmentInstance> remoteInstances = new ArrayList<>();
for (FragmentInstance instance : instances) {
+ if (instance.getHostDataNode() == null) {
+ dataNodeFailureList.add(
+ new
TSStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()));
+ continue;
+ }
TEndPoint endPoint = instance.getHostDataNode().getInternalEndPoint();
if (isDispatchedToLocal(endPoint)) {
localInstances.add(instance);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
index 38d4d86d24e..49a8086d635 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ErrorHandlingUtils.java
@@ -30,6 +30,8 @@ import
org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.exception.query.QueryTimeoutRuntimeException;
import org.apache.iotdb.db.exception.sql.SemanticException;
import org.apache.iotdb.db.protocol.thrift.OperationType;
+import
org.apache.iotdb.db.queryengine.plan.planner.exceptions.ReplicaSetUnreachableException;
+import
org.apache.iotdb.db.queryengine.plan.planner.exceptions.RootFIPlacementException;
import org.apache.iotdb.rpc.RpcUtils;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -111,7 +113,8 @@ public class ErrorHandlingUtils {
|| status.getCode() ==
TSStatusCode.TABLE_ALREADY_EXISTS.getStatusCode()
|| status.getCode() ==
TSStatusCode.COLUMN_NOT_EXISTS.getStatusCode()
|| status.getCode() ==
TSStatusCode.COLUMN_ALREADY_EXISTS.getStatusCode()
- || status.getCode() ==
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()) {
+ || status.getCode() ==
TSStatusCode.UDF_LOAD_CLASS_ERROR.getStatusCode()
+ || status.getCode() ==
TSStatusCode.PLAN_FAILED_NETWORK_PARTITION.getStatusCode()) {
LOGGER.info(message);
} else {
LOGGER.warn(message, e);
@@ -151,6 +154,10 @@ public class ErrorHandlingUtils {
} else if (t instanceof QueryInBatchStatementException) {
return RpcUtils.getStatus(
TSStatusCode.QUERY_NOT_ALLOWED, INFO_NOT_ALLOWED_IN_BATCH_ERROR +
rootCause.getMessage());
+ } else if (t instanceof RootFIPlacementException) {
+ return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION,
rootCause.getMessage());
+ } else if (t instanceof ReplicaSetUnreachableException) {
+ return RpcUtils.getStatus(TSStatusCode.PLAN_FAILED_NETWORK_PARTITION,
rootCause.getMessage());
} else if (t instanceof IoTDBException) {
return RpcUtils.getStatus(((IoTDBException) t).getErrorCode(),
rootCause.getMessage());
} else if (t instanceof TsFileRuntimeException) {
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
index 169c07d59d9..fad0efa75b8 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientPoolFactory.java
@@ -131,6 +131,7 @@ public class ClientPoolFactory {
.setConnectionTimeoutMs(conf.getDnConnectionTimeoutInMS())
.setRpcThriftCompressionEnabled(conf.isRpcThriftCompressionEnabled())
.setSelectorNumOfAsyncClientManager(conf.getSelectorNumOfClientManager())
+ .setPrintLogWhenEncounterException(false)
.build(),
ThreadName.ASYNC_DATANODE_CLIENT_POOL.getName()),
new
ClientPoolProperty.Builder<AsyncDataNodeInternalServiceClient>()
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
index aa1ca3fe4d4..8053b677c78 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/AsyncRequestManager.java
@@ -118,6 +118,14 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
final AsyncRequestContext<?, ?, RequestType, NodeLocation>
requestContext,
final int retryNum,
final Long timeoutInMs) {
+ sendAsyncRequest(requestContext, retryNum, timeoutInMs, false);
+ }
+
+ public void sendAsyncRequest(
+ final AsyncRequestContext<?, ?, RequestType, NodeLocation>
requestContext,
+ final int retryNum,
+ final Long timeoutInMs,
+ final boolean keepSilent) {
if (requestContext.getRequestIndices().isEmpty()) {
return;
}
@@ -155,7 +163,7 @@ public abstract class AsyncRequestManager<RequestType,
NodeLocation, Client> {
}
}
- if (!requestContext.getRequestIndices().isEmpty()) {
+ if (!requestContext.getRequestIndices().isEmpty() && !keepSilent) {
LOGGER.warn(
"Failed to {} after {} retries, requestIndices: {}",
requestType,
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
new file mode 100644
index 00000000000..f00de855eb8
--- /dev/null
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/DataNodeIntraHeartbeatRequestManager.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.commons.client.request;
+
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import
org.apache.iotdb.commons.client.async.AsyncDataNodeInternalServiceClient;
+
+public abstract class DataNodeIntraHeartbeatRequestManager<RequestType>
+ extends AsyncRequestManager<
+ RequestType, TDataNodeLocation, AsyncDataNodeInternalServiceClient> {
+
+ @Override
+ protected void initClientManager() {
+ clientManager =
+ new IClientManager.Factory<TEndPoint,
AsyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.AsyncDataNodeHeartbeatServiceClientPoolFactory());
+ }
+
+ @Override
+ protected TEndPoint nodeLocationToEndPoint(TDataNodeLocation
dataNodeLocation) {
+ return dataNodeLocation.getInternalEndPoint();
+ }
+}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
index 310c74e60a1..33dcb712a20 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/request/TestConnectionUtils.java
@@ -67,7 +67,8 @@ public class TestConnectionUtils {
.forEach(
(nodeId, status) -> {
TEndPoint endPoint =
getEndPoint.apply(anotherNodeLocationMap.get(nodeId));
- TServiceProvider serviceProvider = new
TServiceProvider(endPoint, serviceType);
+ TServiceProvider serviceProvider =
+ new TServiceProvider(endPoint, serviceType, nodeId);
TTestConnectionResult result = new TTestConnectionResult();
result.setSender(sender);
result.setServiceProvider(serviceProvider);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 3965622078c..577b9bb795b 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -82,6 +82,7 @@ public enum ThreadName {
CONFIG_NODE_SIMPLE_CONSENSUS_WAL_FLUSH("ConfigNode-Simple-Consensus-WAL-Flush-Thread"),
// -------------------------- ConfigNode-Heartbeat --------------------------
CONFIG_NODE_HEART_BEAT_SERVICE("Cluster-Heartbeat-Service"),
+ CONFIG_NODE_TOPOLOGY_SERVICE("Topology-Service"),
ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL("AsyncConfigNodeHeartbeatServiceClientPool"),
ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL("AsyncDataNodeHeartbeatServiceClientPool"),
// -------------------------- ConfigNode-LoadBalance
--------------------------
@@ -340,6 +341,7 @@ public enum ThreadName {
new HashSet<>(
Arrays.asList(
CONFIG_NODE_HEART_BEAT_SERVICE,
+ CONFIG_NODE_TOPOLOGY_SERVICE,
ASYNC_CONFIGNODE_HEARTBEAT_CLIENT_POOL,
ASYNC_DATANODE_HEARTBEAT_CLIENT_POOL));
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
index 648762f7c4b..1fe95764287 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/ExecutorType.java
@@ -22,13 +22,19 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import java.util.Optional;
+
/** The interface is used to indicate where to execute a FragmentInstance */
public interface ExecutorType {
/** Indicate if ExecutorType is StorageExecutor */
boolean isStorageExecutor();
- TDataNodeLocation getDataNodeLocation();
+ /**
+ * @return Optional.empty() iff {@link #isStorageExecutor()} and all
candidate replica locations
+ * are unreachable
+ */
+ Optional<TDataNodeLocation> getDataNodeLocation();
default TRegionReplicaSet getRegionReplicaSet() {
throw new UnsupportedOperationException(getClass().getName());
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
index 4255c3955d8..7c53d7631f4 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/QueryExecutor.java
@@ -22,7 +22,10 @@ package org.apache.iotdb.commons.partition;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.tsfile.utils.Preconditions;
+
import java.util.Objects;
+import java.util.Optional;
/** QueryExecutor indicates this query can execute directly without data from
StorageEngine */
public class QueryExecutor implements ExecutorType {
@@ -33,8 +36,9 @@ public class QueryExecutor implements ExecutorType {
}
@Override
- public TDataNodeLocation getDataNodeLocation() {
- return dataNodeLocation;
+ public Optional<TDataNodeLocation> getDataNodeLocation() {
+ Preconditions.checkArgument(dataNodeLocation != null);
+ return Optional.of(dataNodeLocation);
}
@Override
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
index a99b4dea07e..4c256962524 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/partition/StorageExecutor.java
@@ -23,8 +23,10 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
import java.util.Objects;
+import java.util.Optional;
/** StorageExecutor indicates execution of this query need data from
StorageEngine */
public class StorageExecutor implements ExecutorType {
@@ -35,8 +37,12 @@ public class StorageExecutor implements ExecutorType {
}
@Override
- public TDataNodeLocation getDataNodeLocation() {
- return regionReplicaSet.getDataNodeLocations().get(0);
+ @Nullable
+ public Optional<TDataNodeLocation> getDataNodeLocation() {
+ if (regionReplicaSet.getDataNodeLocations().isEmpty()) {
+ return Optional.empty();
+ }
+ return Optional.of(regionReplicaSet.getDataNodeLocations().get(0));
}
@Override
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index 8c2b8cedb46..161c8525cca 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -229,6 +229,7 @@ enum TServiceType {
struct TServiceProvider {
1: required TEndPoint endPoint
2: required TServiceType serviceType
+ 3: required i32 nodeId
}
struct TSender {
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 34ec8f848c4..7008a983428 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -277,6 +277,8 @@ struct TDataNodeHeartbeatReq {
9: optional i64 deviceQuotaRemain
10: optional TDataNodeActivation activation
11: optional set<common.TEndPoint> configNodeEndPoints
+ 12: optional map<i32, common.TDataNodeLocation> dataNodes
+ 13: optional map<i32, set<i32>> topology
}
struct TDataNodeActivation {
@@ -1177,6 +1179,8 @@ service IDataNodeRPCService {
common.TTestConnectionResp submitTestConnectionTask(common.TNodeLocations
nodeLocations)
+ common.TTestConnectionResp
submitInternalTestConnectionTask(common.TNodeLocations nodeLocations)
+
/** Empty rpc, only for connection test */
common.TSStatus testConnectionEmptyRPC()
}