This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch region_migration
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/region_migration by this push:
new b7be36ca595 [To region_migration] cn pull from dn, IT improve,
coordinator avoid submit duplicate (#12127)
b7be36ca595 is described below
commit b7be36ca5951de688c398a28647fb56277880da8
Author: Li Yu Heng <[email protected]>
AuthorDate: Thu Mar 7 20:44:29 2024 +0800
[To region_migration] cn pull from dn, IT improve, coordinator avoid submit
duplicate (#12127)
---
.../iotdb/it/env/cluster/env/AbstractEnv.java | 2 +-
.../it/env/cluster/node/AbstractNodeWrapper.java | 14 +-
.../it/IoTDBRegionMigrateReliabilityIT.java | 263 ++++++++++++---------
.../confignode/it/procedure/IoTDBProcedureIT.java | 4 +-
.../iotdb/confignode/manager/ConfigManager.java | 11 -
.../apache/iotdb/confignode/manager/IManager.java | 9 -
.../iotdb/confignode/manager/ProcedureManager.java | 37 +--
.../iotdb/confignode/manager/node/NodeManager.java | 6 +-
.../procedure/env/ConfigNodeProcedureEnv.java | 6 +-
...moveHandler.java => RegionMaintainHandler.java} | 51 +++-
.../impl/node/RemoveDataNodeProcedure.java | 4 +-
.../impl/statemachine/AddRegionPeerProcedure.java | 89 ++-----
.../impl/statemachine/RegionMigrateProcedure.java | 10 +-
.../statemachine/RemoveRegionPeerProcedure.java | 95 ++------
.../thrift/ConfigNodeRPCServiceProcessor.java | 6 -
.../iotdb/db/protocol/client/ConfigNodeClient.java | 7 -
.../impl/DataNodeInternalRPCServiceImpl.java | 6 +
.../iotdb/db/service/RegionMigrateService.java | 141 ++++++-----
.../thrift-commons/src/main/thrift/common.thrift | 15 ++
.../src/main/thrift/confignode.thrift | 9 -
.../src/main/thrift/datanode.thrift | 6 +
21 files changed, 359 insertions(+), 432 deletions(-)
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 257d813982c..569059c56a0 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -344,7 +344,7 @@ public abstract class AbstractEnv implements BaseEnv {
for (AbstractNodeWrapper nodeWrapper :
Stream.concat(this.dataNodeWrapperList.stream(),
this.configNodeWrapperList.stream())
.collect(Collectors.toList())) {
- nodeWrapper.stop();
+ nodeWrapper.stopForcibly();
nodeWrapper.destroyDir();
String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
if (!new File(lockPath).delete()) {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index dbc43f8fc41..a27d57e4720 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -475,15 +475,19 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
if (this.instance == null) {
return;
}
+ logger.info("Node {} will be shutdown soon", this.getPort());
this.instance.destroy();
try {
if (!this.instance.waitFor(20, TimeUnit.SECONDS)) {
+ logger.warn("Node {} will be shutdown forcibly soon", this.getPort());
this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Waiting node to shutdown error. %s", e);
+ logger.error("Waiting node to shutdown error.", e);
+ return;
}
+ logger.info("Node {} has been shutdown", this.getPort());
}
@Override
@@ -492,12 +496,14 @@ public abstract class AbstractNodeWrapper implements
BaseNodeWrapper {
return;
}
try {
- this.instance.destroyForcibly().waitFor(5, TimeUnit.SECONDS);
- logger.info("Node {} has been successfully forcibly stopped", nodePort);
+ logger.info("Node {} will be shutdown forcibly soon", this.getPort());
+ this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- logger.error("Waiting node to shutdown error. %s", e);
+ logger.error("Waiting node to shutdown error.", e);
+ return;
}
+ logger.info("Node {} has been shutdown", this.getPort());
}
@Override
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
index 18ddac96ac5..812f393e86c 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/IoTDBRegionMigrateReliabilityIT.java
@@ -23,12 +23,16 @@ import
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
import org.apache.iotdb.consensus.ConsensusFactory;
import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.env.cluster.node.AbstractNodeWrapper;
+import org.apache.iotdb.it.env.cluster.node.ConfigNodeWrapper;
import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionTimeoutException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -44,14 +48,18 @@ import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
-import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
public class IoTDBRegionMigrateReliabilityIT {
private static final Logger LOGGER =
@@ -81,61 +89,95 @@ public class IoTDBRegionMigrateReliabilityIT {
@Test
public void normal1C2DTest() throws Exception {
- EnvFactory.getEnv()
- .getConfig()
- .getCommonConfig()
- .setDataReplicationFactor(1)
- .setSchemaReplicationFactor(1);
- EnvFactory.getEnv().initClusterEnvironment(1, 2);
+ generalTest(1, 1, 1, 2, Collections.emptySet(), Collections.emptySet());
+ }
- try (final Connection connection = EnvFactory.getEnv().getConnection();
- final Statement statement = connection.createStatement()) {
+ @Test
+ public void normal3C3DTest() throws Exception {
+ generalTest(2, 3, 3, 3, Collections.emptySet(), Collections.emptySet());
+ }
- statement.execute(INSERTION);
+ // endregion
- ResultSet result = statement.executeQuery(SHOW_REGIONS);
- Map<Integer, Set<Integer>> regionMap = getRegionMap(result);
+ // region ConfigNode crash tests
+ @Test
+ public void cnCrashDuringPreCheck() throws Exception {
+ generalTest(
+ 1,
+ 1,
+ 1,
+ 2,
+ Stream.of(RegionTransitionState.REGION_MIGRATE_PREPARE.toString())
+ .collect(Collectors.toSet()),
+ Collections.emptySet());
+ }
- result = statement.executeQuery(SHOW_DATANODES);
- Set<Integer> dataNodeSet = new HashSet<>();
- while (result.next()) {
- dataNodeSet.add(result.getInt(ColumnHeaderConstant.NODE_ID));
- }
+ @Test
+ public void cnCrashDuringCreatePeer() throws Exception {
+ generalTest(
+ 1,
+ 1,
+ 1,
+ 2,
+
Stream.of(AddRegionPeerState.CREATE_NEW_REGION_PEER.toString()).collect(Collectors.toSet()),
+ Collections.emptySet());
+ }
- final int selectedRegion = selectRegion(regionMap);
- final int originalDataNode = selectOriginalDataNode(regionMap,
selectedRegion);
- final int destDataNode = selectDestDataNode(dataNodeSet, regionMap,
selectedRegion);
+ @Test
+ public void cnCrashDuringDoAddPeer() throws Exception {
+ generalTest(
+ 1,
+ 1,
+ 1,
+ 2,
+
Stream.of(AddRegionPeerState.DO_ADD_REGION_PEER.toString()).collect(Collectors.toSet()),
+ Collections.emptySet());
+ }
- // set breakpoint
- HashMap<String, Runnable> keywordAction = new HashMap<>();
- Arrays.stream(RegionTransitionState.values())
- .forEach(
- state ->
- keywordAction.put(
- String.valueOf(state), () ->
LOGGER.info(String.valueOf(state))));
- ExecutorService service =
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
- LOGGER.info("breakpoint setting...");
- service.submit(() -> logBreakpointMonitor(0, keywordAction));
- LOGGER.info("breakpoint set");
+ @Test
+ public void cnCrashDuring() throws Exception {
+ generalTest(
+ 1,
+ 1,
+ 1,
+ 2,
+ Stream.of(AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE.toString())
+ .collect(Collectors.toSet()),
+ Collections.emptySet());
+ }
- statement.execute(regionMigrateCommand(selectedRegion, originalDataNode,
destDataNode));
+ // TODO: other cn crash test
- awaitUntilSuccess(statement, selectedRegion, originalDataNode,
destDataNode);
+ // endregion
- checkRegionFileClear(originalDataNode);
+ // region DataNode crash tests
- LOGGER.info("test pass");
- }
- }
+ // endregion
- @Test
- public void normal3C3DTest() throws Exception {
+ // region Helpers
+
+ public void generalTest(
+ final int dataReplicateFactor,
+ final int schemaReplicationFactor,
+ final int configNodeNum,
+ final int dataNodeNum,
+ Set<String> killConfigNodeKeywords,
+ Set<String> killDataNodeKeywords // TODO:此参数尚未生效
+ ) throws Exception {
+ // prepare env
EnvFactory.getEnv()
.getConfig()
.getCommonConfig()
- .setDataReplicationFactor(2)
- .setSchemaReplicationFactor(3);
- EnvFactory.getEnv().initClusterEnvironment(3, 3);
+ .setDataReplicationFactor(dataReplicateFactor)
+ .setSchemaReplicationFactor(schemaReplicationFactor);
+ EnvFactory.getEnv().initClusterEnvironment(configNodeNum, dataNodeNum);
+
+ ExecutorService service =
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
+ EnvFactory.getEnv()
+ .getConfigNodeWrapperList()
+ .forEach(
+ configNodeWrapper ->
+ service.submit(() -> nodeLogKillPoint(configNodeWrapper,
killConfigNodeKeywords)));
try (final Connection connection = EnvFactory.getEnv().getConnection();
final Statement statement = connection.createStatement();
@@ -157,94 +199,62 @@ public class IoTDBRegionMigrateReliabilityIT {
final int originalDataNode = selectOriginalDataNode(regionMap,
selectedRegion);
final int destDataNode = selectDestDataNode(dataNodeSet, regionMap,
selectedRegion);
- // set breakpoint
- HashMap<String, Runnable> keywordAction = new HashMap<>();
- Arrays.stream(RegionTransitionState.values())
- .forEach(
- state ->
- keywordAction.put(
- String.valueOf(state), () ->
LOGGER.info(String.valueOf(state))));
- ExecutorService service =
IoTDBThreadPoolFactory.newCachedThreadPool("regionMigrateIT");
- LOGGER.info("breakpoint setting...");
- service.submit(() -> logBreakpointMonitor(0, keywordAction));
- service.submit(() -> logBreakpointMonitor(1, keywordAction));
- service.submit(() -> logBreakpointMonitor(2, keywordAction));
- LOGGER.info("breakpoint set");
-
statement.execute(regionMigrateCommand(selectedRegion, originalDataNode,
destDataNode));
awaitUntilSuccess(statement, selectedRegion, originalDataNode,
destDataNode);
checkRegionFileClear(originalDataNode);
-
- LOGGER.info("test pass");
}
+ LOGGER.info("test pass");
}
- // endregion
-
- // region ConfigNode crash tests
- @Test
- public void cnCrashDuringPreCheck() {}
-
- @Test
- public void cnCrashDuringCreatePeer() {}
-
- @Test
- public void cnCrashDuringAddPeer() {}
-
- // TODO: other cn crash test
-
- // endregion
-
- // region DataNode crash tests
-
- // endregion
-
- // region Helpers
-
/**
* Monitor the node's log and do something.
*
- * @param nodeIndex
- * @param keywordAction Map<keyword, action>
+ * @param nodeWrapper Easy to understand
+ * @param killNodeKeywords When detect these keywords in node's log, stop
the node forcibly
*/
- private static void logBreakpointMonitor(int nodeIndex, HashMap<String,
Runnable> keywordAction) {
+ private static void nodeLogKillPoint(
+ AbstractNodeWrapper nodeWrapper, Set<String> killNodeKeywords) {
+ if (killNodeKeywords.isEmpty()) {
+ return;
+ }
+ final String logFileName;
+ if (nodeWrapper instanceof ConfigNodeWrapper) {
+ logFileName = "log_confignode_all.log";
+ } else {
+ logFileName = "log_datanode_all.log";
+ }
ProcessBuilder builder =
new ProcessBuilder(
"tail",
"-f",
- EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).getNodePath()
- + File.separator
- + "logs"
- + File.separator
- + "log_confignode_all.log");
- builder.redirectErrorStream(true); // 将错误输出和标准输出合并
+ nodeWrapper.getNodePath() + File.separator + "logs" +
File.separator + logFileName);
+ builder.redirectErrorStream(true);
try {
- Process process = builder.start(); // 开始执行命令
- // 读取命令的输出
+ Process process = builder.start();
try (BufferedReader reader =
new BufferedReader(new InputStreamReader(process.getInputStream())))
{
String line;
while ((line = reader.readLine()) != null) {
- Set<String> detected = new HashSet<>();
+ // if trigger more than one keyword at a same time, test code may
have mistakes
+
Assert.assertTrue(killNodeKeywords.stream().filter(line::contains).count() <=
1);
String finalLine = line;
- keywordAction
- .keySet()
- .forEach(
- k -> {
- if (finalLine.contains(k)) {
- detected.add(k);
- }
- });
- detected.forEach(
- k -> {
- keywordAction.get(k).run();
- //
- //
EnvFactory.getEnv().getConfigNodeWrapper(nodeIndex).stopForcibly();
- keywordAction.remove(k);
- });
+ Optional<String> detectedKeyword =
+ killNodeKeywords.stream()
+ .filter(keyword -> finalLine.contains("breakpoint:" +
keyword))
+ .findAny();
+ if (detectedKeyword.isPresent()) {
+ // reboot the node
+ nodeWrapper.stopForcibly();
+ nodeWrapper.start();
+ // each keyword only trigger once
+ killNodeKeywords.remove(detectedKeyword.get());
+ }
+ if (killNodeKeywords.isEmpty()) {
+ break;
+ }
}
}
} catch (IOException e) {
@@ -279,7 +289,7 @@ public class IoTDBRegionMigrateReliabilityIT {
Map<Integer, Set<Integer>> regionMap, int selectedRegion) {
return regionMap.get(selectedRegion).stream()
.findAny()
- .orElseThrow(() -> new RuntimeException("gg"));
+ .orElseThrow(() -> new RuntimeException("cannot find original
DataNode"));
}
private static int selectDestDataNode(
@@ -287,20 +297,37 @@ public class IoTDBRegionMigrateReliabilityIT {
return dataNodeSet.stream()
.filter(dataNodeId ->
!regionMap.get(selectedRegion).contains(dataNodeId))
.findAny()
- .orElseThrow(() -> new RuntimeException("gg"));
+ .orElseThrow(() -> new RuntimeException("cannot find dest DataNode"));
}
private static void awaitUntilSuccess(
Statement statement, int selectedRegion, int originalDataNode, int
destDataNode) {
- Awaitility.await()
- .atMost(1, TimeUnit.MINUTES)
- .until(
- () -> {
- Map<Integer, Set<Integer>> newRegionMap =
- getRegionMap(statement.executeQuery(SHOW_REGIONS));
- Set<Integer> dataNodes = newRegionMap.get(selectedRegion);
- return !dataNodes.contains(originalDataNode) &&
dataNodes.contains(destDataNode);
- });
+ AtomicReference<Set<Integer>> lastTimeDataNodes = new AtomicReference<>();
+ try {
+ Awaitility.await()
+ .atMost(1, TimeUnit.MINUTES)
+ .until(
+ () -> {
+ try {
+ Map<Integer, Set<Integer>> newRegionMap =
+ getRegionMap(statement.executeQuery(SHOW_REGIONS));
+ Set<Integer> dataNodes = newRegionMap.get(selectedRegion);
+ lastTimeDataNodes.set(dataNodes);
+ return !dataNodes.contains(originalDataNode) &&
dataNodes.contains(destDataNode);
+ } catch (Exception e) {
+ // Any exception can be ignored
+ return false;
+ }
+ });
+ } catch (ConditionTimeoutException e) {
+ // Set<Integer> expectation = new Set<>(lastTimeDataNodes);
+ String actualSetStr = lastTimeDataNodes.get().toString();
+ lastTimeDataNodes.get().remove(originalDataNode);
+ lastTimeDataNodes.get().add(destDataNode);
+ String expectSetStr = lastTimeDataNodes.toString();
+ LOGGER.info("DataNode Set {} is unexpected, expect {}", actualSetStr,
expectSetStr);
+ throw e;
+ }
}
/** Check whether the original DataNode's region file has been deleted. */
diff --git
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
index 4d0f388a33b..831879da0d2 100644
---
a/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/confignode/it/procedure/IoTDBProcedureIT.java
@@ -79,11 +79,13 @@ public class IoTDBProcedureIT {
@Test
public void procedureRecoverAtAnotherConfigNodeTest() throws Exception {
recoverTest(3, false);
+ LOGGER.info("test pass");
}
@Test
public void procedureRecoverAtTheSameConfigNodeTest() throws Exception {
recoverTest(1, true);
+ LOGGER.info("test pass");
}
private void recoverTest(int configNodeNum, boolean needRestartLeader)
throws Exception {
@@ -114,7 +116,7 @@ public class IoTDBProcedureIT {
Assert.assertTrue(resp.getDatabaseInfoMap().size() < MAX_STATE);
// Then shutdown the leader, wait the new leader exist and the procedure
continue
final int oldLeaderIndex = EnvFactory.getEnv().getLeaderConfigNodeIndex();
- EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stop();
+ EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).stopForcibly();
if (needRestartLeader) {
EnvFactory.getEnv().getConfigNodeWrapper(oldLeaderIndex).start();
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
index 4f45ea5587a..b4102c3ed6f 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ConfigManager.java
@@ -151,7 +151,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TNodeVersionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
@@ -411,16 +410,6 @@ public class ConfigManager implements IManager {
return status;
}
- @Override
- public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
{
- TSStatus status = confirmLeader();
- if (status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- // TODO: 这里需要修改report机制,改为向AddRegionPeerProcedure汇报
- procedureManager.reportRegionMigrateResult(req);
- }
- return status;
- }
-
@Override
public DataSet getDataNodeConfiguration(
GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
index e9e310bc342..368eaf5fb8a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/IManager.java
@@ -91,7 +91,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetTriggerTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaPartitionTableResp;
@@ -242,14 +241,6 @@ public interface IManager {
*/
TSStatus reportDataNodeShutdown(TDataNodeLocation dataNodeLocation);
- /**
- * DataNode report region migrate result to ConfigNode when remove DataNode.
- *
- * @param req TRegionMigrateResultReportReq
- * @return TSStatus
- */
- TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req);
-
/**
* Get DataNode info.
*
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
index 5d10d8f7871..5a9b08f548e 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/ProcedureManager.java
@@ -51,7 +51,7 @@ import org.apache.iotdb.confignode.procedure.Procedure;
import org.apache.iotdb.confignode.procedure.ProcedureExecutor;
import org.apache.iotdb.confignode.procedure.ProcedureMetrics;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.impl.CreateManyDatabasesProcedure;
import org.apache.iotdb.confignode.procedure.impl.cq.CreateCQProcedure;
import org.apache.iotdb.confignode.procedure.impl.node.AddConfigNodeProcedure;
@@ -74,10 +74,8 @@ import
org.apache.iotdb.confignode.procedure.impl.schema.DeleteLogicalViewProced
import
org.apache.iotdb.confignode.procedure.impl.schema.DeleteTimeSeriesProcedure;
import org.apache.iotdb.confignode.procedure.impl.schema.SetTemplateProcedure;
import
org.apache.iotdb.confignode.procedure.impl.schema.UnsetTemplateProcedure;
-import
org.apache.iotdb.confignode.procedure.impl.statemachine.AddRegionPeerProcedure;
import
org.apache.iotdb.confignode.procedure.impl.statemachine.CreateRegionGroupsProcedure;
import
org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
-import
org.apache.iotdb.confignode.procedure.impl.statemachine.RemoveRegionPeerProcedure;
import org.apache.iotdb.confignode.procedure.impl.sync.AuthOperationProcedure;
import
org.apache.iotdb.confignode.procedure.impl.trigger.CreateTriggerProcedure;
import org.apache.iotdb.confignode.procedure.impl.trigger.DropTriggerProcedure;
@@ -96,7 +94,6 @@ import org.apache.iotdb.confignode.rpc.thrift.TDatabaseSchema;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteLogicalViewReq;
import org.apache.iotdb.confignode.rpc.thrift.TDeleteTimeSeriesReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.db.exception.BatchProcessException;
import org.apache.iotdb.db.schemaengine.template.Template;
import org.apache.iotdb.rpc.RpcUtils;
@@ -641,7 +638,7 @@ public class ProcedureManager {
}
// select coordinator for adding peer
- DataNodeRemoveHandler handler = new DataNodeRemoveHandler(configManager);
+ RegionMaintainHandler handler = new RegionMaintainHandler(configManager);
Optional<TDataNodeLocation> selectedDataNode =
handler.filterDataNodeWithOtherRegionReplica(regionGroupId,
destDataNode);
if (!selectedDataNode.isPresent()) {
@@ -1077,36 +1074,6 @@ public class ProcedureManager {
this.env = env;
}
- public void reportRegionMigrateResult(TRegionMigrateResultReportReq req) {
- // TODO: ugly, will fix soon
- this.executor
- .getProcedures()
- .values()
- .forEach(
- procedure1 -> {
- if (procedure1 instanceof AddRegionPeerProcedure) {
- AddRegionPeerProcedure procedure = (AddRegionPeerProcedure)
procedure1;
- if (procedure.getConsensusGroupId().equals(req.getRegionId()))
{
- procedure.notifyAddPeerFinished(req);
- }
- }
- });
-
- // TODO: ugly, will fix soon
- this.executor
- .getProcedures()
- .values()
- .forEach(
- procedure1 -> {
- if (procedure1 instanceof RemoveRegionPeerProcedure) {
- RemoveRegionPeerProcedure procedure =
(RemoveRegionPeerProcedure) procedure1;
- if (procedure.getConsensusGroupId().equals(req.getRegionId()))
{
- procedure.notifyRemovePeerFinished(req);
- }
- }
- });
- }
-
public void addMetrics() {
MetricService.getInstance().addMetricSet(this.procedureMetrics);
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
index d18f701d1fb..aa457e80a48 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/node/NodeManager.java
@@ -62,7 +62,7 @@ import
org.apache.iotdb.confignode.manager.partition.PartitionMetrics;
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
import org.apache.iotdb.confignode.manager.schema.ClusterSchemaManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
@@ -344,8 +344,8 @@ public class NodeManager {
public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
- DataNodeRemoveHandler dataNodeRemoveHandler =
- new DataNodeRemoveHandler((ConfigManager) configManager);
+ RegionMaintainHandler dataNodeRemoveHandler =
+ new RegionMaintainHandler((ConfigManager) configManager);
DataNodeToStatusResp preCheckStatus =
dataNodeRemoveHandler.checkRemoveDataNodeRequest(removeDataNodePlan);
if (preCheckStatus.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index 518f3cc75c7..cf4cf20495a 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -104,14 +104,14 @@ public class ConfigNodeProcedureEnv {
private final ProcedureScheduler scheduler;
- private final DataNodeRemoveHandler dataNodeRemoveHandler;
+ private final RegionMaintainHandler dataNodeRemoveHandler;
private final ReentrantLock removeConfigNodeLock;
public ConfigNodeProcedureEnv(ConfigManager configManager,
ProcedureScheduler scheduler) {
this.configManager = configManager;
this.scheduler = scheduler;
- this.dataNodeRemoveHandler = new DataNodeRemoveHandler(configManager);
+ this.dataNodeRemoveHandler = new RegionMaintainHandler(configManager);
this.removeConfigNodeLock = new ReentrantLock();
}
@@ -735,7 +735,7 @@ public class ConfigNodeProcedureEnv {
return schedulerLock;
}
- public DataNodeRemoveHandler getDataNodeRemoveHandler() {
+ public RegionMaintainHandler getDataNodeRemoveHandler() {
return dataNodeRemoveHandler;
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
similarity index 93%
rename from
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
rename to
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 95ccf3de64e..acc95b66b40 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/DataNodeRemoveHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -22,8 +22,14 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
+import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.ClientPoolFactory;
+import org.apache.iotdb.commons.client.IClientManager;
+import org.apache.iotdb.commons.client.sync.SyncDataNodeInternalServiceClient;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.service.metric.MetricService;
import org.apache.iotdb.commons.utils.NodeUrlUtils;
@@ -59,9 +65,9 @@ import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REMOVE_DATANOD
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.SIMPLE_CONSENSUS;
-public class DataNodeRemoveHandler {
+public class RegionMaintainHandler {
- private static final Logger LOGGER =
LoggerFactory.getLogger(DataNodeRemoveHandler.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(RegionMaintainHandler.class);
private static final ConfigNodeConfig CONF =
ConfigNodeDescriptor.getInstance().getConf();
@@ -70,8 +76,14 @@ public class DataNodeRemoveHandler {
/** region migrate lock */
private final LockQueue regionMigrateLock = new LockQueue();
- public DataNodeRemoveHandler(ConfigManager configManager) {
+ private final IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
dataNodeClientManager;
+
+ public RegionMaintainHandler(ConfigManager configManager) {
this.configManager = configManager;
+ dataNodeClientManager =
+ new IClientManager.Factory<TEndPoint,
SyncDataNodeInternalServiceClient>()
+ .createClientManager(
+ new
ClientPoolFactory.SyncDataNodeInternalServiceClientPoolFactory());
}
public static String getIdWithRpcEndpoint(TDataNodeLocation location) {
@@ -234,12 +246,15 @@ public class DataNodeRemoveHandler {
* @return TSStatus
*/
public TSStatus addRegionPeer(
- TDataNodeLocation destDataNode, TConsensusGroupId regionId,
TDataNodeLocation coordinator) {
+ long procedureId,
+ TDataNodeLocation destDataNode,
+ TConsensusGroupId regionId,
+ TDataNodeLocation coordinator) {
TSStatus status;
// Send addRegionPeer request to the selected DataNode,
// destDataNode is where the new RegionReplica is created
- TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
destDataNode);
+ TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
destDataNode, procedureId);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
@@ -268,11 +283,13 @@ public class DataNodeRemoveHandler {
public TSStatus removeRegionPeer(
TDataNodeLocation originalDataNode,
TConsensusGroupId regionId,
- TDataNodeLocation coordinator) {
+ TDataNodeLocation coordinator,
+ long procedureId) {
TSStatus status;
// Send removeRegionPeer request to the rpcClientDataNode
- TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
originalDataNode);
+ TMaintainPeerReq maintainPeerReq =
+ new TMaintainPeerReq(regionId, originalDataNode, procedureId);
status =
SyncDataNodeClientPool.getInstance()
.sendSyncRequestToDataNodeWithRetry(
@@ -298,10 +315,11 @@ public class DataNodeRemoveHandler {
* @return TSStatus
*/
public TSStatus deleteOldRegionPeer(
- TDataNodeLocation originalDataNode, TConsensusGroupId regionId) {
+ TDataNodeLocation originalDataNode, TConsensusGroupId regionId, long
procedureId) {
TSStatus status;
- TMaintainPeerReq maintainPeerReq = new TMaintainPeerReq(regionId,
originalDataNode);
+ TMaintainPeerReq maintainPeerReq =
+ new TMaintainPeerReq(regionId, originalDataNode, procedureId);
status =
configManager.getLoadManager().getNodeStatus(originalDataNode.getDataNodeId())
@@ -325,6 +343,21 @@ public class DataNodeRemoveHandler {
return status;
}
+ public TRegionMaintainTaskStatus waitTaskFinish(long taskId,
TDataNodeLocation dataNodeLocation) {
+ while (true) {
+ try (SyncDataNodeInternalServiceClient dataNodeClient =
+
dataNodeClientManager.borrowClient(dataNodeLocation.getInternalEndPoint())) {
+ TRegionMigrateResultReportReq report =
dataNodeClient.getRegionMaintainResult(taskId);
+ if (report.getTaskStatus() != TRegionMaintainTaskStatus.PROCESSING) {
+ return report.getTaskStatus();
+ }
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
public void addRegionLocation(TConsensusGroupId regionId, TDataNodeLocation
newLocation) {
LOGGER.info(
"AddRegionLocation started, add region {} to {}",
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
index 103ab321fd8..ee4baf4cd25 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/node/RemoveDataNodeProcedure.java
@@ -25,7 +25,7 @@ import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.impl.statemachine.RegionMigrateProcedure;
import org.apache.iotdb.confignode.procedure.state.RemoveDataNodeState;
@@ -68,7 +68,7 @@ public class RemoveDataNodeProcedure extends
AbstractNodeProcedure<RemoveDataNod
return Flow.NO_MORE_STATE;
}
- DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+ RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case REGION_REPLICA_CHECK:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
index b5b9448f771..488b85f3522 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/AddRegionPeerProcedure.java
@@ -21,18 +21,17 @@ package
org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.state.AddRegionPeerState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +41,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint;
-import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.ADD_REGION_PEER_PROGRESS;
-import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static
org.apache.iotdb.confignode.procedure.state.AddRegionPeerState.UPDATE_REGION_LOCATION_CACHE;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
@@ -56,11 +53,6 @@ public class AddRegionPeerProcedure
private TDataNodeLocation destDataNode;
- private boolean addRegionPeerSuccess = true;
- private String addRegionPeerResult;
-
- private final Object addRegionPeerLock = new Object();
-
public AddRegionPeerProcedure() {
super();
}
@@ -81,7 +73,7 @@ public class AddRegionPeerProcedure
if (consensusGroupId == null) {
return Flow.NO_MORE_STATE;
}
- DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+ RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case CREATE_NEW_REGION_PEER:
@@ -90,17 +82,23 @@ public class AddRegionPeerProcedure
setNextState(AddRegionPeerState.DO_ADD_REGION_PEER);
break;
case DO_ADD_REGION_PEER:
- TSStatus tsStatus = handler.addRegionPeer(destDataNode,
consensusGroupId, coordinator);
+ TSStatus tsStatus =
+ handler.addRegionPeer(this.getProcId(), destDataNode,
consensusGroupId, coordinator);
+ TRegionMaintainTaskStatus result;
+ logBreakpoint(state.name());
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId, state);
+ result = handler.waitTaskFinish(this.getProcId(), coordinator);
} else {
throw new ProcedureException("ADD_REGION_PEER executed failed in
DataNode");
}
- logBreakpoint(state.name());
- setNextState(UPDATE_REGION_LOCATION_CACHE);
- break;
+ if (result == TRegionMaintainTaskStatus.SUCCESS) {
+ setNextState(UPDATE_REGION_LOCATION_CACHE);
+ break;
+ }
+ throw new ProcedureException("ADD_REGION_PEER executed failed in
DataNode");
case UPDATE_REGION_LOCATION_CACHE:
handler.addRegionLocation(consensusGroupId, destDataNode);
+ logBreakpoint(state.name());
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
@@ -112,61 +110,6 @@ public class AddRegionPeerProcedure
}
// TODO: Clear all remaining information related to 'migrate' and 'migration'
- public TSStatus waitForOneMigrationStepFinished(
- TConsensusGroupId consensusGroupId, AddRegionPeerState state) throws
Exception {
- LOGGER.info(
- "{}, Wait for state {} finished, regionId: {}",
- REGION_MIGRATE_PROCESS,
- state,
- consensusGroupId);
-
- TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
- synchronized (addRegionPeerLock) {
- try {
- addRegionPeerLock.wait();
-
- if (!addRegionPeerSuccess) {
- throw new ProcedureException(
- String.format("Region migration failed, regionId: %s",
consensusGroupId));
- }
- } catch (InterruptedException e) {
- LOGGER.error(
- "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS,
consensusGroupId, e);
- Thread.currentThread().interrupt();
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("Waiting for region migration interruption," +
e.getMessage());
- }
- }
- return status;
- }
-
- public void notifyAddPeerFinished(TRegionMigrateResultReportReq req) {
-
- LOGGER.info(
- "{}, ConfigNode received region migration result reported by DataNode:
{}",
- ADD_REGION_PEER_PROGRESS,
- req);
-
- // TODO the req is used in roll back
- synchronized (addRegionPeerLock) {
- TSStatus migrateStatus = req.getMigrateResult();
- // Migration failed
- if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info(
- "{}, Region migration failed in DataNode, migrateStatus: {}",
- ADD_REGION_PEER_PROGRESS,
- migrateStatus);
- addRegionPeerSuccess = false;
- addRegionPeerResult = migrateStatus.toString();
- }
- addRegionPeerLock.notifyAll();
- }
- }
-
- @Override
- protected boolean isRollbackSupported(AddRegionPeerState state) {
- return false;
- }
@Override
protected void rollbackState(
@@ -216,8 +159,4 @@ public class AddRegionPeerProcedure
public TDataNodeLocation getCoordinator() {
return coordinator;
}
-
- public TDataNodeLocation getDestDataNode() {
- return destDataNode;
- }
}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
index 35a51008ea6..8b9fc5d6ac2 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RegionMigrateProcedure.java
@@ -24,7 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.state.ProcedureLockState;
import org.apache.iotdb.confignode.procedure.state.RegionTransitionState;
@@ -40,7 +40,7 @@ import java.util.Objects;
import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint;
import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
-import static
org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler.getIdWithRpcEndpoint;
+import static
org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler.getIdWithRpcEndpoint;
/** Region migrate procedure */
public class RegionMigrateProcedure
@@ -52,8 +52,6 @@ public class RegionMigrateProcedure
private static final int RETRY_THRESHOLD = 5;
/** Wait region migrate finished */
- private final Object regionMigrateLock = new Object();
-
private TConsensusGroupId consensusGroupId;
private TDataNodeLocation originalDataNode;
@@ -63,8 +61,6 @@ public class RegionMigrateProcedure
private TDataNodeLocation coordinatorForAddPeer;
private TDataNodeLocation coordinatorForRemovePeer;
- private boolean migrateSuccess = true;
-
private String migrateResult = "";
public RegionMigrateProcedure() {
@@ -90,7 +86,7 @@ public class RegionMigrateProcedure
if (consensusGroupId == null) {
return Flow.NO_MORE_STATE;
}
- DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+ RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case REGION_MIGRATE_PREPARE:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
index f656653a113..db58f456201 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/statemachine/RemoveRegionPeerProcedure.java
@@ -21,18 +21,17 @@ package
org.apache.iotdb.confignode.procedure.impl.statemachine;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
-import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
+import org.apache.iotdb.confignode.procedure.env.RegionMaintainHandler;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
import org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState;
import org.apache.iotdb.confignode.procedure.store.ProcedureType;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
-import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,7 +41,6 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import static org.apache.iotdb.commons.utils.FileUtils.logBreakpoint;
-import static
org.apache.iotdb.confignode.conf.ConfigNodeConstant.REGION_MIGRATE_PROCESS;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.DELETE_OLD_REGION_PEER;
import static
org.apache.iotdb.confignode.procedure.state.RemoveRegionPeerState.REMOVE_REGION_LOCATION_CACHE;
import static org.apache.iotdb.rpc.TSStatusCode.SUCCESS_STATUS;
@@ -54,10 +52,6 @@ public class RemoveRegionPeerProcedure
private TDataNodeLocation coordinator;
private TDataNodeLocation targetDataNode;
- private boolean removeRegionPeerSuccess = true;
- private String removeRegionPeerResult;
- private final Object removeRegionPeerLock = new Object();
-
public RemoveRegionPeerProcedure() {
super();
}
@@ -79,27 +73,37 @@ public class RemoveRegionPeerProcedure
return Flow.NO_MORE_STATE;
}
TSStatus tsStatus;
- DataNodeRemoveHandler handler = env.getDataNodeRemoveHandler();
+ RegionMaintainHandler handler = env.getDataNodeRemoveHandler();
try {
switch (state) {
case REMOVE_REGION_PEER:
- tsStatus = handler.removeRegionPeer(targetDataNode,
consensusGroupId, coordinator);
+ tsStatus =
+ handler.removeRegionPeer(
+ targetDataNode, consensusGroupId, coordinator,
this.getProcId());
+ TRegionMaintainTaskStatus result;
+ logBreakpoint(state.name());
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId, state);
+ result = handler.waitTaskFinish(this.getProcId(), coordinator);
} else {
throw new ProcedureException("REMOVE_REGION_PEER executed failed
in DataNode");
}
- logBreakpoint(state.name());
+ if (result != TRegionMaintainTaskStatus.SUCCESS) {
+ throw new ProcedureException("REMOVE_REGION_PEER executed failed
in DataNode");
+ }
setNextState(DELETE_OLD_REGION_PEER);
break;
case DELETE_OLD_REGION_PEER:
- tsStatus = handler.deleteOldRegionPeer(targetDataNode,
consensusGroupId);
+ tsStatus =
+ handler.deleteOldRegionPeer(targetDataNode, consensusGroupId,
this.getProcId());
+ logBreakpoint(state.name());
if (tsStatus.getCode() == SUCCESS_STATUS.getStatusCode()) {
- waitForOneMigrationStepFinished(consensusGroupId, state);
+ result = handler.waitTaskFinish(this.getProcId(), targetDataNode);
+ } else {
+ throw new ProcedureException("DELETE_OLD_REGION_PEER executed
failed in DataNode");
+ }
+ if (result != TRegionMaintainTaskStatus.SUCCESS) {
+ throw new ProcedureException("DELETE_OLD_REGION_PEER executed
failed in DataNode");
}
- logBreakpoint(state.name());
- // Remove consensus group after a node stop, which will be failed,
but we will
- // continuously execute.
setNextState(REMOVE_REGION_LOCATION_CACHE);
break;
case REMOVE_REGION_LOCATION_CACHE:
@@ -115,67 +119,10 @@ public class RemoveRegionPeerProcedure
return Flow.HAS_MORE_STATE;
}
- public TSStatus waitForOneMigrationStepFinished(
- TConsensusGroupId consensusGroupId, RemoveRegionPeerState state) throws
Exception {
-
- LOGGER.info(
- "{}, Wait for state {} finished, regionId: {}",
- REGION_MIGRATE_PROCESS,
- state,
- consensusGroupId);
-
- TSStatus status = new TSStatus(SUCCESS_STATUS.getStatusCode());
- synchronized (removeRegionPeerLock) {
- try {
- // TODO set timeOut?
- removeRegionPeerLock.wait();
-
- if (!removeRegionPeerSuccess) {
- throw new ProcedureException(
- String.format("Region migration failed, regionId: %s",
consensusGroupId));
- }
- } catch (InterruptedException e) {
- LOGGER.error(
- "{}, region migration {} interrupt", REGION_MIGRATE_PROCESS,
consensusGroupId, e);
- Thread.currentThread().interrupt();
- status.setCode(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
- status.setMessage("Waiting for region migration interruption," +
e.getMessage());
- }
- }
- return status;
- }
-
- public void notifyRemovePeerFinished(TRegionMigrateResultReportReq req) {
- LOGGER.info(
- "{}, ConfigNode received region migration result reported by DataNode:
{}",
- REGION_MIGRATE_PROCESS,
- req);
-
- // TODO the req is used in roll back
- synchronized (removeRegionPeerLock) {
- TSStatus migrateStatus = req.getMigrateResult();
- // Migration failed
- if (migrateStatus.getCode() != SUCCESS_STATUS.getStatusCode()) {
- LOGGER.info(
- "{}, Region migration failed in DataNode, migrateStatus: {}",
- REGION_MIGRATE_PROCESS,
- migrateStatus);
- removeRegionPeerSuccess = false;
- removeRegionPeerResult = migrateStatus.toString();
- }
- removeRegionPeerLock.notifyAll();
- }
- }
-
@Override
protected void rollbackState(ConfigNodeProcedureEnv env,
RemoveRegionPeerState state)
throws IOException, InterruptedException, ProcedureException {}
- @Override
- protected boolean isRollbackSupported(RemoveRegionPeerState state) {
- return false;
- }
-
@Override
protected RemoveRegionPeerState getState(int stateId) {
return RemoveRegionPeerState.values()[stateId];
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
index 95c82a1c001..5b77ad2f046 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/thrift/ConfigNodeRPCServiceProcessor.java
@@ -129,7 +129,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -270,11 +269,6 @@ public class ConfigNodeRPCServiceProcessor implements
IConfigNodeRPCService.Ifac
return resp;
}
- @Override
- public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
{
- return configManager.reportRegionMigrateResult(req);
- }
-
@Override
public TShowClusterResp showCluster() {
return configManager.showCluster();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
index 1387e5f338f..089047a704e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/client/ConfigNodeClient.java
@@ -98,7 +98,6 @@ import
org.apache.iotdb.confignode.rpc.thrift.TGetUDFTableResp;
import org.apache.iotdb.confignode.rpc.thrift.TLoginReq;
import org.apache.iotdb.confignode.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.confignode.rpc.thrift.TPermissionInfoResp;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.confignode.rpc.thrift.TRegionRouteMapResp;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementReq;
import org.apache.iotdb.confignode.rpc.thrift.TSchemaNodeManagementResp;
@@ -417,12 +416,6 @@ public class ConfigNodeClient implements
IConfigNodeRPCService.Iface, ThriftClie
resp -> !updateConfigNodeLeader(resp.status));
}
- @Override
- public TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
throws TException {
- return executeRemoteCallWithRetry(
- () -> client.reportRegionMigrateResult(req), status ->
!updateConfigNodeLeader(status));
- }
-
@Override
public TShowClusterResp showCluster() throws TException {
return executeRemoteCallWithRetry(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index 5a62e1ddd6a..c4588d5bac3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
+import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.common.rpc.thrift.TSetSpaceQuotaReq;
import org.apache.iotdb.common.rpc.thrift.TSetTTLReq;
@@ -1618,6 +1619,11 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return status;
}
+ @Override
+ public TRegionMigrateResultReportReq getRegionMaintainResult(long taskId)
throws TException {
+ return RegionMigrateService.getRegionMaintainResult(taskId);
+ }
+
private TSStatus createNewRegion(ConsensusGroupId regionId, String
storageGroup, long ttl) {
return regionManager.createNewRegion(regionId, storageGroup, ttl);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 092394c9bf2..a39dd72d34f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -21,9 +21,10 @@ package org.apache.iotdb.db.service;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionMaintainTaskStatus;
import org.apache.iotdb.common.rpc.thrift.TRegionMigrateFailedType;
+import org.apache.iotdb.common.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
@@ -32,28 +33,24 @@ import org.apache.iotdb.commons.consensus.SchemaRegionId;
import org.apache.iotdb.commons.exception.StartupException;
import org.apache.iotdb.commons.service.IService;
import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.confignode.rpc.thrift.TRegionMigrateResultReportReq;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.consensus.exception.ConsensusException;
import
org.apache.iotdb.consensus.exception.PeerAlreadyInConsensusGroupException;
import org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
-import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
-import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
import org.apache.iotdb.db.schemaengine.SchemaEngine;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
public class RegionMigrateService implements IService {
@@ -67,6 +64,13 @@ public class RegionMigrateService implements IService {
private RegionMigratePool regionMigratePool;
+ // Map<taskId, taskStatus>
+ // TODO:暂时无法处理一个procedure中向同一个datanode提交多个异步任务的情况
+ private static final ConcurrentHashMap<Long, TRegionMigrateResultReportReq>
taskResultMap =
+ new ConcurrentHashMap<>();
+ private static final TRegionMigrateResultReportReq unfinishedResult =
+ new TRegionMigrateResultReportReq();
+
private RegionMigrateService() {}
public static RegionMigrateService getInstance() {
@@ -80,10 +84,13 @@ public class RegionMigrateService implements IService {
* @return if the submit task succeed
*/
public synchronized boolean submitAddRegionPeerTask(TMaintainPeerReq req) {
-
boolean submitSucceed = true;
try {
- regionMigratePool.submit(new AddRegionPeerTask(req.getRegionId(),
req.getDestNode()));
+ if (!addToTaskResultMap(req.getTaskId())) {
+ LOGGER.warn("{} The AddRegionPeerTask {} has already been submitted
and will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId());
+ }
+ regionMigratePool.submit(
+ new AddRegionPeerTask(req.getTaskId(), req.getRegionId(),
req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
"{}, Submit AddRegionPeerTask error for Region: {}",
@@ -105,7 +112,11 @@ public class RegionMigrateService implements IService {
boolean submitSucceed = true;
try {
- regionMigratePool.submit(new RemoveRegionPeerTask(req.getRegionId(),
req.getDestNode()));
+ if (!addToTaskResultMap(req.getTaskId())) {
+ LOGGER.warn("{} The RemoveRegionPeer {} has already been submitted and
will not be submitted again.", REGION_MIGRATE_PROCESS, req.getTaskId());
+ }
+ regionMigratePool.submit(
+ new RemoveRegionPeerTask(req.getTaskId(), req.getRegionId(),
req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
"{}, Submit RemoveRegionPeer task error for Region: {}",
@@ -124,10 +135,13 @@ public class RegionMigrateService implements IService {
* @return if the submit task succeed
*/
public synchronized boolean submitDeleteOldRegionPeerTask(TMaintainPeerReq
req) {
-
boolean submitSucceed = true;
try {
- regionMigratePool.submit(new DeleteOldRegionPeerTask(req.getRegionId(),
req.getDestNode()));
+ if (!addToTaskResultMap(req.getTaskId())) {
+ LOGGER.warn("{} The DeleteOldRegionPeerTask {} has already been
submitted and will not be submitted again.", REGION_MIGRATE_PROCESS,
req.getTaskId());
+ }
+ regionMigratePool.submit(
+ new DeleteOldRegionPeerTask(req.getTaskId(), req.getRegionId(),
req.getDestNode()));
} catch (Exception e) {
LOGGER.error(
"{}, Submit DeleteOldRegionPeerTask error for Region: {}",
@@ -139,6 +153,14 @@ public class RegionMigrateService implements IService {
return submitSucceed;
}
+ private boolean addToTaskResultMap(long taskId) {
+ if (taskResultMap.containsKey(taskId)) {
+ return false;
+ }
+ taskResultMap.put(taskId, unfinishedResult);
+ return true;
+ }
+
@Override
public void start() throws StartupException {
regionMigratePool = new RegionMigratePool();
@@ -190,13 +212,17 @@ public class RegionMigrateService implements IService {
private static final Logger taskLogger =
LoggerFactory.getLogger(AddRegionPeerTask.class);
+ private final long taskId;
+
// The RegionGroup that shall perform the add peer process
private final TConsensusGroupId tRegionId;
// The new DataNode to be added in the RegionGroup
private final TDataNodeLocation destDataNode;
- public AddRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation
destDataNode) {
+ public AddRegionPeerTask(
+ long taskId, TConsensusGroupId tRegionId, TDataNodeLocation
destDataNode) {
+ this.taskId = taskId;
this.tRegionId = tRegionId;
this.destDataNode = destDataNode;
}
@@ -205,11 +231,12 @@ public class RegionMigrateService implements IService {
public void run() {
TSStatus runResult = addPeer();
if (isFailed(runResult)) {
- reportFailed(tRegionId, destDataNode,
TRegionMigrateFailedType.AddPeerFailed, runResult);
+ taskFail(
+ taskId, tRegionId, destDataNode,
TRegionMigrateFailedType.AddPeerFailed, runResult);
return;
}
- reportSucceed(tRegionId, "AddPeer");
+ taskSucceed(taskId, tRegionId, "AddPeer");
}
private TSStatus addPeer() {
@@ -282,11 +309,15 @@ public class RegionMigrateService implements IService {
private static final Logger taskLogger =
LoggerFactory.getLogger(RemoveRegionPeerTask.class);
+ private final long taskId;
+
private final TConsensusGroupId tRegionId;
private final TDataNodeLocation destDataNode;
- public RemoveRegionPeerTask(TConsensusGroupId tRegionId, TDataNodeLocation
destDataNode) {
+ public RemoveRegionPeerTask(
+ long taskId, TConsensusGroupId tRegionId, TDataNodeLocation
destDataNode) {
+ this.taskId = taskId;
this.tRegionId = tRegionId;
this.destDataNode = destDataNode;
}
@@ -295,9 +326,10 @@ public class RegionMigrateService implements IService {
public void run() {
TSStatus runResult = removePeer();
if (isSucceed(runResult)) {
- reportSucceed(tRegionId, "RemovePeer");
+ taskSucceed(taskId, tRegionId, "RemovePeer");
} else {
- reportFailed(tRegionId, destDataNode,
TRegionMigrateFailedType.RemovePeerFailed, runResult);
+ taskFail(
+ taskId, tRegionId, destDataNode,
TRegionMigrateFailedType.RemovePeerFailed, runResult);
}
}
@@ -375,13 +407,15 @@ public class RegionMigrateService implements IService {
private static class DeleteOldRegionPeerTask implements Runnable {
private static final Logger taskLogger =
LoggerFactory.getLogger(DeleteOldRegionPeerTask.class);
+ private final long taskId;
private final TConsensusGroupId tRegionId;
private final TDataNodeLocation originalDataNode;
public DeleteOldRegionPeerTask(
- TConsensusGroupId tRegionId, TDataNodeLocation originalDataNode) {
+ long taskId, TConsensusGroupId tRegionId, TDataNodeLocation
originalDataNode) {
+ this.taskId = taskId;
this.tRegionId = tRegionId;
this.originalDataNode = originalDataNode;
}
@@ -391,7 +425,8 @@ public class RegionMigrateService implements IService {
// deletePeer: remove the peer from the consensus group
TSStatus runResult = deletePeer();
if (isFailed(runResult)) {
- reportFailed(
+ taskFail(
+ taskId,
tRegionId,
originalDataNode,
TRegionMigrateFailedType.RemoveConsensusGroupFailed,
@@ -400,12 +435,17 @@ public class RegionMigrateService implements IService {
// deleteRegion: delete region data
runResult = deleteRegion();
+
if (isFailed(runResult)) {
- reportFailed(
- tRegionId, originalDataNode,
TRegionMigrateFailedType.DeleteRegionFailed, runResult);
+ taskFail(
+ taskId,
+ tRegionId,
+ originalDataNode,
+ TRegionMigrateFailedType.DeleteRegionFailed,
+ runResult);
}
- reportSucceed(tRegionId, "DeletePeer");
+ taskSucceed(taskId, tRegionId, "DeletePeer");
}
private TSStatus deletePeer() {
@@ -476,57 +516,42 @@ public class RegionMigrateService implements IService {
private Holder() {}
}
- private static void reportSucceed(TConsensusGroupId tRegionId, String
migrateState) {
+ private static void taskSucceed(long taskId, TConsensusGroupId tRegionId,
String migrateState) {
TSStatus status = new
TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
status.setMessage(
String.format("Region: %s, state: %s, executed succeed", tRegionId,
migrateState));
- TRegionMigrateResultReportReq req = new
TRegionMigrateResultReportReq(tRegionId, status);
- try {
- reportRegionMigrateResultToConfigNode(req);
- } catch (Exception e) {
- LOGGER.error(
- "{}, Report region {} migrate result error in reportSucceed, result:
{}",
- REGION_MIGRATE_PROCESS,
- tRegionId,
- req,
- e);
- }
+ TRegionMigrateResultReportReq req =
+ new TRegionMigrateResultReportReq(TRegionMaintainTaskStatus.SUCCESS);
+ req.setRegionId(tRegionId).setMigrateResult(status);
+ taskResultMap.put(taskId, req);
}
- private static void reportFailed(
+ private static void taskFail(
+ long taskId,
TConsensusGroupId tRegionId,
TDataNodeLocation failedNode,
TRegionMigrateFailedType failedType,
TSStatus status) {
Map<TDataNodeLocation, TRegionMigrateFailedType> failedNodeAndReason = new
HashMap<>();
failedNodeAndReason.put(failedNode, failedType);
- TRegionMigrateResultReportReq req = new
TRegionMigrateResultReportReq(tRegionId, status);
+ TRegionMigrateResultReportReq req =
+ new TRegionMigrateResultReportReq(TRegionMaintainTaskStatus.FAIL);
+ req.setRegionId(tRegionId).setMigrateResult(status);
req.setFailedNodeAndReason(failedNodeAndReason);
- try {
- reportRegionMigrateResultToConfigNode(req);
- } catch (Exception e) {
- LOGGER.error(
- "{}, Report region {} migrate error in reportFailed, result:{}",
- REGION_MIGRATE_PROCESS,
- tRegionId,
- req,
- e);
- }
+ taskResultMap.put(taskId, req);
}
- private static void
reportRegionMigrateResultToConfigNode(TRegionMigrateResultReportReq req)
- throws TException, ClientManagerException {
- TSStatus status;
- try (ConfigNodeClient client =
-
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
- status = client.reportRegionMigrateResult(req);
- LOGGER.info(
- "{}, Report region {} migrate result {} to Config node succeed,
result: {}",
- REGION_MIGRATE_PROCESS,
- req.getRegionId(),
- req,
- status);
+ // TODO: 单例模式下static与非static的区别?
+ public static TRegionMigrateResultReportReq getRegionMaintainResult(Long
taskId) {
+ TRegionMigrateResultReportReq result = new TRegionMigrateResultReportReq();
+ if (!taskResultMap.containsKey(taskId)) {
+ result.setTaskStatus(TRegionMaintainTaskStatus.TASK_NOT_EXIST);
+ } else if (taskResultMap.get(taskId) == unfinishedResult) {
+ result.setTaskStatus(TRegionMaintainTaskStatus.PROCESSING);
+ } else {
+ result = taskResultMap.get(taskId);
}
+ return result;
}
private static boolean isSucceed(TSStatus status) {
diff --git a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
index ba5a2da6b09..636b05888f4 100644
--- a/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
+++ b/iotdb-protocol/thrift-commons/src/main/thrift/common.thrift
@@ -90,6 +90,7 @@ struct TDataNodeConfiguration {
2: required TNodeResource resource
}
+// TODO: deprecated
enum TRegionMigrateFailedType {
AddPeerFailed,
RemovePeerFailed,
@@ -98,6 +99,20 @@ enum TRegionMigrateFailedType {
CreateRegionFailed
}
+struct TRegionMigrateResultReportReq {
+ 1: optional TConsensusGroupId regionId
+ 2: optional TSStatus migrateResult
+ 3: optional map<TDataNodeLocation, TRegionMigrateFailedType>
failedNodeAndReason
+ 4: required TRegionMaintainTaskStatus taskStatus
+}
+
+enum TRegionMaintainTaskStatus {
+ TASK_NOT_EXIST,
+ PROCESSING,
+ SUCCESS,
+ FAIL,
+}
+
struct TFlushReq {
1: optional string isSeq
2: optional list<string> storageGroups
diff --git a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
index 989255c0cc8..886e2bbe7d8 100644
--- a/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
+++ b/iotdb-protocol/thrift-confignode/src/main/thrift/confignode.thrift
@@ -140,12 +140,6 @@ struct TDataNodeRemoveResp {
2: optional map<common.TDataNodeLocation, common.TSStatus> nodeToStatus
}
-struct TRegionMigrateResultReportReq {
- 1: required common.TConsensusGroupId regionId
- 2: required common.TSStatus migrateResult
- 3: optional map<common.TDataNodeLocation, common.TRegionMigrateFailedType>
failedNodeAndReason
-}
-
struct TDataNodeConfigurationResp {
1: required common.TSStatus status
// map<DataNodeId, DataNodeConfiguration>
@@ -880,9 +874,6 @@ service IConfigNodeRPCService {
*/
TDataNodeConfigurationResp getDataNodeConfiguration(i32 dataNodeId)
- /** Report region migration complete */
- common.TSStatus reportRegionMigrateResult(TRegionMigrateResultReportReq req)
-
// ======================================================
// Database
// ======================================================
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 8afc6bbe3e7..7042f67c3a2 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -51,6 +51,7 @@ struct TCreatePeerReq {
struct TMaintainPeerReq {
1: required common.TConsensusGroupId regionId
2: required common.TDataNodeLocation destNode
+ 3: required i64 taskId
}
struct TFragmentInstanceId {
@@ -557,6 +558,11 @@ service IDataNodeRPCService {
*/
common.TSStatus deleteOldRegionPeer(TMaintainPeerReq req);
+ /**
+ * Get the result of a region maintainance task
+ */
+ common.TRegionMigrateResultReportReq getRegionMaintainResult(i64 taskId)
+
/**
* Config node will disable the Data node, the Data node will not accept
read/write request when disabled
* @param data node location