This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 04b5ba4c63f Load & Region Migrate: Notify all DNs before and after RM
(#15032) (#15131)
04b5ba4c63f is described below
commit 04b5ba4c63f31c0f2d2961e8dffdc83a83ec04ea
Author: Zikun Ma <[email protected]>
AuthorDate: Tue Mar 25 10:30:42 2025 +0800
Load & Region Migrate: Notify all DNs before and after RM (#15032) (#15131)
(cherry picked from commit 50a48ce42b3c0a608fca49c326e364e3226f3a0d)
---
.../client/async/CnToDnAsyncRequestType.java | 1 +
.../CnToDnInternalServiceAsyncRequestManager.java | 6 +
.../rpc/DataNodeAsyncRequestRPCHandler.java | 1 +
.../procedure/env/ConfigNodeProcedureEnv.java | 15 +++
.../region/NotifyRegionMigrationProcedure.java | 137 +++++++++++++++++++++
.../impl/region/RegionMigrateProcedure.java | 2 +
.../state/NotifyRegionMigrationState.java | 24 ++++
.../procedure/store/ProcedureFactory.java | 7 ++
.../confignode/procedure/store/ProcedureType.java | 1 +
.../impl/DataNodeInternalRPCServiceImpl.java | 7 ++
.../plan/scheduler/load/LoadTsFileScheduler.java | 10 ++
.../iotdb/db/service/RegionMigrateService.java | 18 +++
.../src/main/thrift/datanode.thrift | 10 ++
13 files changed, 239 insertions(+)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
index e5d1515d27e..225d4341913 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnAsyncRequestType.java
@@ -39,6 +39,7 @@ public enum CnToDnAsyncRequestType {
CREATE_SCHEMA_REGION,
DELETE_REGION,
RESET_PEER_LIST,
+ NOTIFY_REGION_MIGRATION,
UPDATE_REGION_ROUTE_MAP,
CHANGE_REGION_LEADER,
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
index 1bae93c7399..59f99fe57cb 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/CnToDnInternalServiceAsyncRequestManager.java
@@ -71,6 +71,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TFetchSchemaBlackListReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateMatchedSchemaCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -245,6 +246,11 @@ public class CnToDnInternalServiceAsyncRequestManager
CnToDnAsyncRequestType.UPDATE_REGION_ROUTE_MAP,
(req, client, handler) ->
client.updateRegionCache((TRegionRouteReq) req,
(DataNodeTSStatusRPCHandler) handler));
+ actionMapBuilder.put(
+ CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION,
+ (req, client, handler) ->
+ client.notifyRegionMigration(
+ (TNotifyRegionMigrationReq) req, (DataNodeTSStatusRPCHandler)
handler));
actionMapBuilder.put(
CnToDnAsyncRequestType.CHANGE_REGION_LEADER,
(req, client, handler) ->
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
index dc8f14cfaa9..6e2a9dc97cc 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/client/async/handlers/rpc/DataNodeAsyncRequestRPCHandler.java
@@ -202,6 +202,7 @@ public abstract class
DataNodeAsyncRequestRPCHandler<Response>
case STOP_REPAIR_DATA:
case LOAD_CONFIGURATION:
case SET_SYSTEM_STATUS:
+ case NOTIFY_REGION_MIGRATION:
case UPDATE_REGION_ROUTE_MAP:
case INVALIDATE_SCHEMA_CACHE:
case INVALIDATE_MATCHED_SCHEMA_CACHE:
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
index b6f4fe35d9e..a21d2349ed9 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/ConfigNodeProcedureEnv.java
@@ -70,6 +70,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TDropPipePluginInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TDropTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInactiveTriggerInstanceReq;
import org.apache.iotdb.mpp.rpc.thrift.TInvalidateCacheReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushMultiPipeMetaReq;
@@ -522,6 +523,20 @@ public class ConfigNodeProcedureEnv {
return req;
}
+ public List<TSStatus> notifyRegionMigrationToAllDataNodes(
+ TConsensusGroupId consensusGroupId, boolean isStart) {
+ final Map<Integer, TDataNodeLocation> dataNodeLocationMap =
+ configManager.getNodeManager().getRegisteredDataNodeLocations();
+ final TNotifyRegionMigrationReq request =
+ new TNotifyRegionMigrationReq(consensusGroupId, isStart);
+
+ final DataNodeAsyncRequestContext<TNotifyRegionMigrationReq, TSStatus>
clientHandler =
+ new DataNodeAsyncRequestContext<>(
+ CnToDnAsyncRequestType.NOTIFY_REGION_MIGRATION, request,
dataNodeLocationMap);
+
CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler);
+ return clientHandler.getResponseList();
+ }
+
public void persistRegionGroup(CreateRegionGroupsPlan
createRegionGroupsPlan) {
// Persist the allocation result
try {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
new file mode 100644
index 00000000000..0160e9116f9
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/NotifyRegionMigrationProcedure.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.impl.region;
+
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
+import org.apache.iotdb.commons.exception.runtime.ThriftSerDeException;
+import org.apache.iotdb.commons.utils.ThriftCommonsSerDeUtils;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
+import
org.apache.iotdb.confignode.procedure.exception.ProcedureSuspendedException;
+import org.apache.iotdb.confignode.procedure.exception.ProcedureYieldException;
+import org.apache.iotdb.confignode.procedure.state.NotifyRegionMigrationState;
+import org.apache.iotdb.confignode.procedure.store.ProcedureType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+/** A procedure that notifies all DNs of the ongoing region migration
procedure. */
+public class NotifyRegionMigrationProcedure
+ extends RegionOperationProcedure<NotifyRegionMigrationState> {
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(NotifyRegionMigrationProcedure.class);
+
+ private boolean isStart;
+
+ public NotifyRegionMigrationProcedure() {
+ super();
+ }
+
+ public NotifyRegionMigrationProcedure(TConsensusGroupId consensusGroupId,
boolean isStart) {
+ super(consensusGroupId);
+ this.isStart = isStart;
+ }
+
+ @Override
+ protected Flow executeFromState(ConfigNodeProcedureEnv env,
NotifyRegionMigrationState state)
+ throws ProcedureSuspendedException, ProcedureYieldException,
InterruptedException {
+ if (regionId == null) {
+ return Flow.NO_MORE_STATE;
+ }
+ try {
+ LOGGER.info(
+ "[pid{}][NotifyRegionMigration] started, region id is {}.",
getProcId(), regionId);
+ env.notifyRegionMigrationToAllDataNodes(regionId, isStart);
+ } catch (Exception e) {
+ LOGGER.error("[pid{}][NotifyRegionMigration] state {} failed",
getProcId(), state, e);
+ return Flow.NO_MORE_STATE;
+ }
+ LOGGER.info("[pid{}][NotifyRegionMigration] state {} complete",
getProcId(), state);
+ return Flow.HAS_MORE_STATE;
+ }
+
+ @Override
+ protected void rollbackState(
+ ConfigNodeProcedureEnv configNodeProcedureEnv,
NotifyRegionMigrationState state)
+ throws IOException, InterruptedException, ProcedureException {}
+
+ @Override
+ protected NotifyRegionMigrationState getState(int stateId) {
+ return NotifyRegionMigrationState.values()[stateId];
+ }
+
+ @Override
+ protected int getStateId(NotifyRegionMigrationState state) {
+ return state.ordinal();
+ }
+
+ @Override
+ protected NotifyRegionMigrationState getInitialState() {
+ return NotifyRegionMigrationState.INIT;
+ }
+
+ @Override
+ public void serialize(DataOutputStream stream) throws IOException {
+
stream.writeShort(ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE.getTypeCode());
+ super.serialize(stream);
+ ThriftCommonsSerDeUtils.serializeTConsensusGroupId(regionId, stream);
+ stream.writeBoolean(isStart);
+ }
+
+ @Override
+ public void deserialize(ByteBuffer byteBuffer) {
+ super.deserialize(byteBuffer);
+ try {
+ regionId =
ThriftCommonsSerDeUtils.deserializeTConsensusGroupId(byteBuffer);
+ isStart = (byteBuffer.get() != (byte) 0);
+ } catch (ThriftSerDeException e) {
+ LOGGER.error("Error in deserialize {}", this.getClass(), e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (!(obj instanceof NotifyRegionMigrationProcedure)) {
+ return false;
+ }
+ NotifyRegionMigrationProcedure procedure =
(NotifyRegionMigrationProcedure) obj;
+ return this.regionId.equals(procedure.regionId) && this.isStart ==
procedure.isStart;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(regionId, isStart);
+ }
+
+ @Override
+ public String toString() {
+ return "NotifyRegionMigrationProcedure{"
+ + "regionId="
+ + regionId
+ + ", isStart="
+ + isStart
+ + '}';
+ }
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
index f4d5dce26b6..1acdd2ccd54 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/region/RegionMigrateProcedure.java
@@ -83,6 +83,7 @@ public class RegionMigrateProcedure extends
RegionOperationProcedure<RegionTrans
regionId,
handler.simplifiedLocation(originalDataNode),
handler.simplifiedLocation(destDataNode));
+ addChildProcedure(new NotifyRegionMigrationProcedure(regionId,
true));
setNextState(RegionTransitionState.ADD_REGION_PEER);
break;
case ADD_REGION_PEER:
@@ -124,6 +125,7 @@ public class RegionMigrateProcedure extends
RegionOperationProcedure<RegionTrans
CommonDateTimeUtils.convertMillisecondToDurationStr(
System.currentTimeMillis() - getSubmittedTime()),
DateTimeUtils.convertLongToDate(getSubmittedTime(), "ms"));
+ addChildProcedure(new NotifyRegionMigrationProcedure(regionId,
false));
return Flow.NO_MORE_STATE;
default:
throw new ProcedureException("Unsupported state: " + state.name());
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
new file mode 100644
index 00000000000..1b964621e5b
--- /dev/null
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/state/NotifyRegionMigrationState.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.procedure.state;
+
+public enum NotifyRegionMigrationState {
+ INIT,
+}
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
index e5d43137fac..fc88b54f3d5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureFactory.java
@@ -40,6 +40,7 @@ import
org.apache.iotdb.confignode.procedure.impl.pipe.task.StartPipeProcedureV2
import
org.apache.iotdb.confignode.procedure.impl.pipe.task.StopPipeProcedureV2;
import
org.apache.iotdb.confignode.procedure.impl.region.AddRegionPeerProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.CreateRegionGroupsProcedure;
+import
org.apache.iotdb.confignode.procedure.impl.region.NotifyRegionMigrationProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.ReconstructRegionProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.RegionMigrateProcedure;
import
org.apache.iotdb.confignode.procedure.impl.region.RemoveRegionPeerProcedure;
@@ -120,6 +121,10 @@ public class ProcedureFactory implements IProcedureFactory
{
break;
case RECONSTRUCT_REGION_PROCEDURE:
procedure = new ReconstructRegionProcedure();
+ break;
+ case NOTIFY_REGION_MIGRATION_PROCEDURE:
+ procedure = new NotifyRegionMigrationProcedure();
+ break;
case DELETE_TIMESERIES_PROCEDURE:
procedure = new DeleteTimeSeriesProcedure(false);
break;
@@ -318,6 +323,8 @@ public class ProcedureFactory implements IProcedureFactory {
return ProcedureType.DELETE_TIMESERIES_PROCEDURE;
} else if (procedure instanceof ReconstructRegionProcedure) {
return ProcedureType.RECONSTRUCT_REGION_PROCEDURE;
+ } else if (procedure instanceof NotifyRegionMigrationProcedure) {
+ return ProcedureType.NOTIFY_REGION_MIGRATION_PROCEDURE;
} else if (procedure instanceof CreateTriggerProcedure) {
return ProcedureType.CREATE_TRIGGER_PROCEDURE;
} else if (procedure instanceof DropTriggerProcedure) {
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
index 82b28186756..48ccca42d44 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ProcedureType.java
@@ -39,6 +39,7 @@ public enum ProcedureType {
RECONSTRUCT_REGION_PROCEDURE((short) 203),
ADD_REGION_PEER_PROCEDURE((short) 204),
REMOVE_REGION_PEER_PROCEDURE((short) 205),
+ NOTIFY_REGION_MIGRATION_PROCEDURE((short) 206),
@TestOnly
CREATE_MANY_DATABASES_PROCEDURE((short) 250),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index b90efed22ac..d0924477154 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -207,6 +207,7 @@ import
org.apache.iotdb.mpp.rpc.thrift.TInvalidatePermissionCacheReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatReq;
import org.apache.iotdb.mpp.rpc.thrift.TPipeHeartbeatResp;
import org.apache.iotdb.mpp.rpc.thrift.TPushConsumerGroupMetaReq;
@@ -2144,6 +2145,12 @@ public class DataNodeInternalRPCServiceImpl implements
IDataNodeRPCService.Iface
return RegionMigrateService.getInstance().getRegionMaintainResult(taskId);
}
+ @Override
+ public TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req) throws
TException {
+ RegionMigrateService.getInstance().notifyRegionMigration(req);
+ return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+ }
+
private TSStatus createNewRegion(ConsensusGroupId regionId, String
storageGroup) {
return regionManager.createNewRegion(regionId, storageGroup);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index 04d38f25f89..0af4d91adfe 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -56,6 +56,7 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePie
import
org.apache.iotdb.db.queryengine.plan.scheduler.FragInstanceDispatchResult;
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.db.service.RegionMigrateService;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.flush.MemTableFlushTask;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -182,6 +183,8 @@ public class LoadTsFileScheduler implements IScheduler {
}
shouldRemoveFileFromLoadingSet = true;
+ final long startTimeMs = System.currentTimeMillis();
+
if (node.isTsFileEmpty()) {
LOGGER.info("Load skip TsFile {}, because it has no data.",
filePath);
} else if (!node.needDecodeTsFile(
@@ -226,6 +229,13 @@ public class LoadTsFileScheduler implements IScheduler {
}
}
+ if (RegionMigrateService.getInstance().getLastNotifyTime() >
startTimeMs) {
+ LOGGER.warn(
+ "LoadTsFileScheduler: Region migration started or ended during
loading TsFile {}, will convert to insertion to avoid data loss",
+ filePath);
+ isLoadSingleTsFileSuccess = false;
+ }
+
if (isLoadSingleTsFileSuccess) {
node.clean();
LOGGER.info(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
index 056df802c13..05328ea91b9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/RegionMigrateService.java
@@ -41,6 +41,7 @@ import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.consensus.SchemaRegionConsensusImpl;
import org.apache.iotdb.db.protocol.thrift.impl.DataNodeRegionManager;
import org.apache.iotdb.mpp.rpc.thrift.TMaintainPeerReq;
+import org.apache.iotdb.mpp.rpc.thrift.TNotifyRegionMigrationReq;
import org.apache.iotdb.mpp.rpc.thrift.TRegionMigrateResult;
import org.apache.iotdb.mpp.rpc.thrift.TResetPeerListReq;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -53,6 +54,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
public class RegionMigrateService implements IService {
@@ -72,6 +74,9 @@ public class RegionMigrateService implements IService {
// where different asynchronous tasks are submitted to the same datanode
within a single procedure
private static final ConcurrentHashMap<Long, TRegionMigrateResult>
taskResultMap =
new ConcurrentHashMap<>();
+
+ private static final AtomicLong lastNotifyTime = new
AtomicLong(Long.MIN_VALUE);
+
private static final TRegionMigrateResult unfinishedResult = new
TRegionMigrateResult();
private RegionMigrateService() {}
@@ -80,6 +85,19 @@ public class RegionMigrateService implements IService {
return Holder.INSTANCE;
}
+ public void notifyRegionMigration(TNotifyRegionMigrationReq req) {
+ lastNotifyTime.set(System.currentTimeMillis());
+ if (req.isIsStart()) {
+ LOGGER.info("Region {} is notified to begin migrating",
req.getRegionId());
+ } else {
+ LOGGER.info("Region {} is notified to finish migrating",
req.getRegionId());
+ }
+ }
+
+ public long getLastNotifyTime() {
+ return lastNotifyTime.get();
+ }
+
/**
* Submit AddRegionPeerTask
*
diff --git a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
index 4b25e08d1f6..bef96496650 100644
--- a/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
+++ b/iotdb-protocol/thrift-datanode/src/main/thrift/datanode.thrift
@@ -52,6 +52,11 @@ struct TRegionMigrateResult {
4: required common.TRegionMaintainTaskStatus taskStatus
}
+struct TNotifyRegionMigrationReq {
+ 1: required common.TConsensusGroupId regionId
+ 2: required bool isStart
+}
+
struct TCreatePeerReq {
1: required common.TConsensusGroupId regionId
2: required list<common.TDataNodeLocation> regionLocations
@@ -781,6 +786,11 @@ service IDataNodeRPCService {
*/
TRegionMigrateResult getRegionMaintainResult(i64 taskId)
+ /**
+ * Notify the DataNode of the beginning or ending the migration of the
specified RegionGroup
+ */
+ common.TSStatus notifyRegionMigration(TNotifyRegionMigrationReq req)
+
/**
* Config node will clean DataNode cache, the Data node will not accept
read/write request when disabled
* @param data node location