This is an automated email from the ASF dual-hosted git repository.
tanxinyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 5387a102543 Modify regionMigrateService's ThreadPool to
IoTThreadPoolFactory.cachedThreadPool (#13548)
5387a102543 is described below
commit 5387a10254378696693ef979b78350457f5fc2d0
Author: 133tosakarin <[email protected]>
AuthorDate: Fri Sep 20 00:29:54 2024 +0800
Modify regionMigrateService's ThreadPool to
IoTThreadPoolFactory.cachedThreadPool (#13548)
* modify regionMigratePool to IoTThreadPool
* fix ut crash
---
.../apache/iotdb/db/schemaengine/SchemaEngine.java | 2 +-
.../iotdb/db/service/RegionMigrateService.java | 36 +++-------------------
2 files changed, 6 insertions(+), 32 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
index e3ef823b144..cdbe91f4e02 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java
@@ -197,7 +197,7 @@ public class SchemaEngine {
for (SchemaRegionId schemaRegionId : v) {
PartialPath database;
try {
- database = new PartialPath(k);
+ database = PartialPath.getDatabasePath(k);
} catch (IllegalPathException e) {
logger.warn("Illegal database path: {}", k);
continue;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 4a455637220..3d86b1ae3f5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -40,7 +40,6 @@ import
org.apache.iotdb.consensus.exception.PeerNotInConsensusGroupException;
import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
-import org.apache.iotdb.db.storageengine.rescon.memory.AbstractPoolManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
@@ -53,6 +52,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
public class RegionMigrateService implements IService {
@@ -65,7 +65,7 @@ public class RegionMigrateService implements IService {
private static final int SLEEP_MILLIS = 5000;
- private RegionMigratePool regionMigratePool;
+ private ExecutorService regionMigratePool;
// Map<taskId, taskStatus>
// TODO: Due to the use of procedureId as taskId, it is currently unable to
handle the situation
@@ -199,15 +199,15 @@ public class RegionMigrateService implements IService {
@Override
public void start() throws StartupException {
- regionMigratePool = new RegionMigratePool();
- regionMigratePool.start();
+ regionMigratePool =
+
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.REGION_MIGRATE.getName());
LOGGER.info("Region migrate service start");
}
@Override
public void stop() {
if (regionMigratePool != null) {
- regionMigratePool.stop();
+ regionMigratePool.shutdown();
}
LOGGER.info("Region migrate service stop");
}
@@ -217,32 +217,6 @@ public class RegionMigrateService implements IService {
return ServiceType.DATA_NODE_REGION_MIGRATE_SERVICE;
}
- private static class RegionMigratePool extends AbstractPoolManager {
-
- private final Logger poolLogger =
LoggerFactory.getLogger(RegionMigratePool.class);
-
- private RegionMigratePool() {
- this.pool =
IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.REGION_MIGRATE.getName());
- }
-
- @Override
- public Logger getLogger() {
- return poolLogger;
- }
-
- @Override
- public void start() {
- if (this.pool != null) {
- poolLogger.info("DataNode region migrate pool start");
- }
- }
-
- @Override
- public String getName() {
- return "migrate region";
- }
- }
-
private static class AddRegionPeerTask implements Runnable {
private static final Logger taskLogger =
LoggerFactory.getLogger(AddRegionPeerTask.class);