This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 18df9a986be Region status Adding and Removing (#12342)
18df9a986be is described below
commit 18df9a986be234960b12f44dd571620ba30a3978
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Apr 18 10:01:36 2024 +0800
Region status Adding and Removing (#12342)
* basically done
* self review
* gg
---
.../handlers/heartbeat/DataNodeHeartbeatHandler.java | 3 ++-
.../apache/iotdb/confignode/manager/load/LoadManager.java | 8 +++++---
.../iotdb/confignode/manager/load/cache/LoadCache.java | 7 +++++--
.../confignode/manager/load/cache/region/RegionCache.java | 15 +++++++++++++++
.../manager/load/cache/region/RegionGroupCache.java | 12 ++++++++++--
.../confignode/procedure/env/RegionMaintainHandler.java | 15 +++++++++++++--
.../procedure/impl/region/AddRegionPeerProcedure.java | 7 ++++++-
.../procedure/impl/region/RemoveRegionPeerProcedure.java | 3 +++
.../org/apache/iotdb/commons/cluster/RegionStatus.java | 12 +++---------
9 files changed, 62 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
index 09a0e9c6383..49981d72834 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/heartbeat/DataNodeHeartbeatHandler.java
@@ -100,7 +100,8 @@ public class DataNodeHeartbeatHandler implements
AsyncMethodCallback<TDataNodeHe
new RegionHeartbeatSample(
heartbeatResp.getHeartbeatTimestamp(),
// Region will inherit DataNode's status
- RegionStatus.parse(heartbeatResp.getStatus())));
+ RegionStatus.valueOf(heartbeatResp.getStatus())),
+ false);
if
(((TConsensusGroupType.SchemaRegion.equals(regionGroupId.getType())
&& SCHEMA_REGION_SHOULD_CACHE_CONSENSUS_SAMPLE)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
index 7aad164be55..895d89246b8 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/LoadManager.java
@@ -333,7 +333,7 @@ public class LoadManager {
regionHeartbeatSampleMap.forEach(
(dataNodeId, regionHeartbeatSample) ->
loadCache.cacheRegionHeartbeatSample(
- regionGroupId, dataNodeId, regionHeartbeatSample)));
+ regionGroupId, dataNodeId, regionHeartbeatSample,
false)));
loadCache.updateRegionGroupStatistics();
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
}
@@ -344,11 +344,13 @@ public class LoadManager {
* @param regionGroupId the specified RegionGroup
* @param dataNodeId the specified DataNode
*/
- public void forceAddRegionCache(TConsensusGroupId regionGroupId, int
dataNodeId) {
+ public void forceAddRegionCache(
+ TConsensusGroupId regionGroupId, int dataNodeId, RegionStatus
regionStatus) {
loadCache.cacheRegionHeartbeatSample(
regionGroupId,
dataNodeId,
- new RegionHeartbeatSample(System.nanoTime(), RegionStatus.Running));
+ new RegionHeartbeatSample(System.nanoTime(), regionStatus),
+ true);
loadCache.updateRegionGroupStatistics();
eventService.checkAndBroadcastRegionGroupStatisticsChangeEventIfNecessary();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
index 4a8a1a5f382..6fae8010e22 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/LoadCache.java
@@ -201,10 +201,13 @@ public class LoadCache {
* @param sample the latest heartbeat sample
*/
public void cacheRegionHeartbeatSample(
- TConsensusGroupId regionGroupId, int nodeId, RegionHeartbeatSample
sample) {
+ TConsensusGroupId regionGroupId,
+ int nodeId,
+ RegionHeartbeatSample sample,
+ boolean overwrite) {
regionGroupCacheMap
.computeIfAbsent(regionGroupId, empty -> new RegionGroupCache())
- .cacheHeartbeatSample(nodeId, sample);
+ .cacheHeartbeatSample(nodeId, sample, overwrite);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
index ec496b478c7..2e68d370519 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionCache.java
@@ -57,4 +57,19 @@ public class RegionCache extends AbstractLoadCache {
public RegionStatistics getCurrentStatistics() {
return (RegionStatistics) currentStatistics.get();
}
+
+ public void cacheHeartbeatSample(RegionHeartbeatSample newHeartbeatSample,
boolean overwrite) {
+ if (overwrite || getLastSample() == null) {
+ super.cacheHeartbeatSample(newHeartbeatSample);
+ return;
+ }
+ RegionStatus lastStatus = ((RegionHeartbeatSample)
getLastSample()).getStatus();
+ if (lastStatus.equals(RegionStatus.Adding) ||
lastStatus.equals(RegionStatus.Removing)) {
+ RegionHeartbeatSample fakeHeartbeatSample =
+ new
RegionHeartbeatSample(newHeartbeatSample.getSampleLogicalTimestamp(),
lastStatus);
+ super.cacheHeartbeatSample(fakeHeartbeatSample);
+ } else {
+ super.cacheHeartbeatSample(newHeartbeatSample);
+ }
+ }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
index 532e955f62e..ae922dc0028 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.confignode.manager.load.cache.region;
import org.apache.iotdb.commons.cluster.RegionStatus;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.confignode.manager.partition.RegionGroupStatus;
import java.util.Map;
@@ -51,11 +52,18 @@ public class RegionGroupCache {
*
* @param dataNodeId Where the specified Region resides
* @param newHeartbeatSample The newest RegionHeartbeatSample
+ * @param overwrite Able to overwrite Adding or Removing
*/
- public void cacheHeartbeatSample(int dataNodeId, RegionHeartbeatSample
newHeartbeatSample) {
+ public void cacheHeartbeatSample(
+ int dataNodeId, RegionHeartbeatSample newHeartbeatSample, boolean
overwrite) {
regionCacheMap
.computeIfAbsent(dataNodeId, empty -> new RegionCache())
- .cacheHeartbeatSample(newHeartbeatSample);
+ .cacheHeartbeatSample(newHeartbeatSample, overwrite);
+ }
+
+ @TestOnly
+ public void cacheHeartbeatSample(int dataNodeId, RegionHeartbeatSample
newHeartbeatSample) {
+ cacheHeartbeatSample(dataNodeId, newHeartbeatSample, false);
}
/**
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index aa2760cad34..cfad4aefa00 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -31,6 +31,7 @@ import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
@@ -417,7 +418,8 @@ public class RegionMaintainHandler {
return report;
}
- public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation
newLocation) {
+ public void addRegionLocation(
+ TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus
regionStatus) {
AddRegionLocationPlan req = new AddRegionLocationPlan(regionId,
newLocation);
TSStatus status =
configManager.getPartitionManager().addRegionLocation(req);
LOGGER.info(
@@ -425,7 +427,16 @@ public class RegionMaintainHandler {
regionId,
getIdWithRpcEndpoint(newLocation),
status);
- configManager.getLoadManager().forceAddRegionCache(regionId,
newLocation.getDataNodeId());
+ configManager
+ .getLoadManager()
+ .forceAddRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
+ }
+
+ public void updateRegionCache(
+ TConsensusGroupId regionId, TDataNodeLocation newLocation, RegionStatus
regionStatus) {
+ configManager
+ .getLoadManager()
+ .forceAddRegionCache(regionId, newLocation.getDataNodeId(),
regionStatus);
}
public void removeRegionLocation(
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
index 9cefb239489..b380c7ca721 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/AddRegionPeerProcedure.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.confignode.procedure.impl.region;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -84,6 +85,7 @@ public class AddRegionPeerProcedure
outerSwitch:
switch (state) {
case CREATE_NEW_REGION_PEER:
+ handler.addRegionLocation(consensusGroupId, destDataNode,
RegionStatus.Adding);
TSStatus status = handler.createNewRegionPeer(consensusGroupId,
destDataNode);
setKillPoint(state);
if (status.getCode() != SUCCESS_STATUS.getStatusCode()) {
@@ -92,6 +94,7 @@ public class AddRegionPeerProcedure
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
case DO_ADD_REGION_PEER:
+ handler.updateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Adding);
// We don't want to re-submit AddRegionPeerTask when leader change
or ConfigNode reboot
if (!this.isStateDeserialized()) {
TSStatus tsStatus =
@@ -127,7 +130,7 @@ public class AddRegionPeerProcedure
throw new UnsupportedOperationException(msg);
}
case UPDATE_REGION_LOCATION_CACHE:
- handler.addRegionLocation(consensusGroupId, destDataNode);
+ handler.updateRegionCache(consensusGroupId, destDataNode,
RegionStatus.Running);
setKillPoint(state);
LOGGER.info("AddRegionPeer state {} complete", state);
LOGGER.info(
@@ -147,6 +150,8 @@ public class AddRegionPeerProcedure
}
private void rollback(ConfigNodeProcedureEnv env, RegionMaintainHandler
handler) {
+ handler.removeRegionLocation(consensusGroupId, destDataNode);
+
List<TDataNodeLocation> correctDataNodeLocations =
env.getConfigManager().getPartitionManager().getAllReplicaSets().stream()
.filter(tRegionReplicaSet ->
tRegionReplicaSet.getRegionId().equals(consensusGroupId))
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
index 1e183a631e6..ef3a58994bd 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RemoveRegionPeerProcedure.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.RegionStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
@@ -79,6 +80,7 @@ public class RemoveRegionPeerProcedure
try {
switch (state) {
case REMOVE_REGION_PEER:
+ handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
tsStatus =
handler.submitRemoveRegionPeerTask(
this.getProcId(), targetDataNode, consensusGroupId,
coordinator);
@@ -100,6 +102,7 @@ public class RemoveRegionPeerProcedure
setNextState(DELETE_OLD_REGION_PEER);
break;
case DELETE_OLD_REGION_PEER:
+ handler.updateRegionCache(consensusGroupId, targetDataNode,
RegionStatus.Removing);
tsStatus =
handler.submitDeleteOldRegionPeerTask(
this.getProcId(), targetDataNode, consensusGroupId);
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
index ade6136fb24..b61a2d6a5f7 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/cluster/RegionStatus.java
@@ -27,6 +27,9 @@ public enum RegionStatus {
/** Region connection failure */
Unknown("Unknown"),
+ /** Region is the destination during an AddRegionPeerProcedure */
+ Adding("Adding"),
+
/** Region is in removing */
Removing("Removing"),
@@ -43,15 +46,6 @@ public enum RegionStatus {
return status;
}
- public static RegionStatus parse(String status) {
- for (RegionStatus regionStatus : RegionStatus.values()) {
- if (regionStatus.status.equals(status)) {
- return regionStatus;
- }
- }
- throw new RuntimeException(String.format("RegionStatus %s doesn't exist.",
status));
- }
-
public static boolean isNormalStatus(RegionStatus status) {
// Currently, the only normal status is Running
return status != null && status.equals(RegionStatus.Running);