This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b15f706589a Fix region migration reliability regressions (#17513)
b15f706589a is described below
commit b15f706589a85864d24a7b8ff76255b2c15cde6e
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Apr 29 11:31:40 2026 +0800
Fix region migration reliability regressions (#17513)
* Fix region migration reliability regressions
* spotless
* Address consensus pipe test review
---
.../procedure/env/RegionMaintainHandler.java | 37 +++++++---
.../RegionMaintainHandlerConsensusPipeTest.java | 64 +++++++++++++++++-
.../apache/iotdb/consensus/iot/IoTConsensus.java | 2 +-
.../apache/iotdb/consensus/iot/StabilityTest.java | 13 ++++
.../PipeRealtimeDataRegionHybridSource.java | 3 +-
.../realtime/PipeRealtimeDataRegionLogSource.java | 1 +
.../realtime/PipeRealtimeDataRegionSource.java | 34 ++++++++++
.../PipeRealtimeDataRegionTsFileSource.java | 1 +
.../realtime/assigner/PipeDataRegionAssigner.java | 16 -----
.../PipeRealtimeReplicateIndexAssignmentTest.java | 79 ++++++++++++++++++++++
10 files changed, 222 insertions(+), 28 deletions(-)
diff --git
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 5043becc1fa..47105dda1c5 100644
---
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -299,15 +299,22 @@ public class RegionMaintainHandler {
TMaintainPeerReq maintainPeerReq =
new TMaintainPeerReq(regionId, originalDataNode, procedureId);
- // Always use full retries regardless of node status, because after a
cluster crash the
- // target DataNode may be Unknown but still in the process of restarting.
+ final NodeStatus nodeStatus =
getDataNodeStatus(originalDataNode.getDataNodeId());
+ final boolean useFullRetry = !NodeStatus.Unknown.equals(nodeStatus);
+ if (!useFullRetry) {
+ LOGGER.info(
+ "{}, DataNode {} is {}, submit DELETE_OLD_REGION_PEER with a single
RPC attempt and let RemoveRegionPeerProcedure handle retries.",
+ REGION_MIGRATE_PROCESS,
+ simplifiedLocation(originalDataNode),
+ nodeStatus);
+ }
+
status =
- (TSStatus)
- SyncDataNodeClientPool.getInstance()
- .sendSyncRequestToDataNodeWithRetry(
- originalDataNode.getInternalEndPoint(),
- maintainPeerReq,
- CnToDnSyncRequestType.DELETE_OLD_REGION_PEER);
+ submitDataNodeSyncRequest(
+ originalDataNode.getInternalEndPoint(),
+ maintainPeerReq,
+ CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
+ useFullRetry);
LOGGER.info(
"{}, Send action deleteOldRegionPeer finished, regionId: {},
dataNodeId: {}",
REGION_MIGRATE_PROCESS,
@@ -316,6 +323,20 @@ public class RegionMaintainHandler {
return status;
}
+ protected NodeStatus getDataNodeStatus(int dataNodeId) {
+ return configManager.getLoadManager().getNodeStatus(dataNodeId);
+ }
+
+ protected TSStatus submitDataNodeSyncRequest(
+ TEndPoint endPoint, Object request, CnToDnSyncRequestType requestType,
boolean useFullRetry) {
+ return (TSStatus)
+ (useFullRetry
+ ? SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithRetry(endPoint, request,
requestType)
+ : SyncDataNodeClientPool.getInstance()
+ .sendSyncRequestToDataNodeWithGivenRetry(endPoint, request,
requestType, 1));
+ }
+
public Map<Integer, TSStatus> resetPeerList(
TConsensusGroupId regionId,
List<TDataNodeLocation> correctDataNodeLocations,
diff --git
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
index b384b9e0f0a..a99d6f52fb8 100644
---
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
+++
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
@@ -24,11 +24,15 @@ import
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
import
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
@@ -45,6 +49,9 @@ import java.util.HashMap;
import java.util.Map;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -59,7 +66,8 @@ public class RegionMaintainHandlerConsensusPipeTest {
private PipeManager pipeManager;
private PipeTaskCoordinator pipeTaskCoordinator;
private ProcedureManager procedureManager;
- private RegionMaintainHandler handler;
+ private LoadManager loadManager;
+ private TestRegionMaintainHandler handler;
private String originalConsensusProtocol;
@@ -76,13 +84,15 @@ public class RegionMaintainHandlerConsensusPipeTest {
pipeManager = mock(PipeManager.class);
pipeTaskCoordinator = mock(PipeTaskCoordinator.class);
procedureManager = mock(ProcedureManager.class);
+ loadManager = mock(LoadManager.class);
when(configManager.getPartitionManager()).thenReturn(partitionManager);
when(configManager.getPipeManager()).thenReturn(pipeManager);
when(configManager.getProcedureManager()).thenReturn(procedureManager);
+ when(configManager.getLoadManager()).thenReturn(loadManager);
when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
- handler = new RegionMaintainHandler(configManager);
+ handler = new TestRegionMaintainHandler(configManager);
}
@After
@@ -254,6 +264,34 @@ public class RegionMaintainHandlerConsensusPipeTest {
verify(procedureManager, times(1)).startConsensusPipe(pipe2to1);
}
+ @Test
+ public void testDeleteOldRegionPeerUsesSingleAttemptWhenNodeUnknown() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+
when(loadManager.getNodeStatus(loc1.getDataNodeId())).thenReturn(NodeStatus.Unknown);
+
+ handler.submitDeleteOldRegionPeerTask(
+ 1L, loc1, new TConsensusGroupId(TConsensusGroupType.DataRegion, 100));
+
+ verify(loadManager, times(1)).getNodeStatus(loc1.getDataNodeId());
+ assertFalse(handler.lastUseFullRetry);
+ assertEquals(CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
handler.lastRequestType);
+ assertEquals(loc1.getInternalEndPoint(), handler.lastEndPoint);
+ }
+
+ @Test
+ public void testDeleteOldRegionPeerKeepsFullRetryWhenNodeRunning() {
+ TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+
when(loadManager.getNodeStatus(loc1.getDataNodeId())).thenReturn(NodeStatus.Running);
+
+ handler.submitDeleteOldRegionPeerTask(
+ 1L, loc1, new TConsensusGroupId(TConsensusGroupType.DataRegion, 100));
+
+ verify(loadManager, times(1)).getNodeStatus(loc1.getDataNodeId());
+ assertTrue(handler.lastUseFullRetry);
+ assertEquals(CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
handler.lastRequestType);
+ assertEquals(loc1.getInternalEndPoint(), handler.lastEndPoint);
+ }
+
@Test
public void testThreeNodeReplicaSetCreatesAllSixPipes() {
TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
@@ -287,4 +325,26 @@ public class RegionMaintainHandlerConsensusPipeTest {
verify(procedureManager, never()).dropConsensusPipeAsync(any());
verify(procedureManager, never()).startConsensusPipe(any());
}
+
+ private static class TestRegionMaintainHandler extends RegionMaintainHandler
{
+ private boolean lastUseFullRetry;
+ private TEndPoint lastEndPoint;
+ private CnToDnSyncRequestType lastRequestType;
+
+ private TestRegionMaintainHandler(ConfigManager configManager) {
+ super(configManager);
+ }
+
+ @Override
+ protected TSStatus submitDataNodeSyncRequest(
+ TEndPoint endPoint,
+ Object request,
+ CnToDnSyncRequestType requestType,
+ boolean useFullRetry) {
+ lastEndPoint = endPoint;
+ lastRequestType = requestType;
+ lastUseFullRetry = useFullRetry;
+ return new TSStatus(200);
+ }
+ }
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index a3410217868..256b918c02c 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -273,7 +273,7 @@ public class IoTConsensus implements IConsensus {
String path = buildPeerDir(storageDir, groupId);
File file = new File(path);
- if (!file.mkdirs()) {
+ if (!file.exists() && !file.mkdirs()) {
logger.warn("Unable to create consensus dir for group {}
at {}", groupId, path);
return null;
}
diff --git
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 5147632431f..1351b674312 100644
---
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -186,6 +186,19 @@ public class StabilityTest {
consensusImpl.deleteLocalPeer(dataRegionId);
}
+ @Test
+ public void createLocalPeerShouldAllowExistingConsensusDir() throws
Exception {
+ File existingPeerDir = new File(IoTConsensus.buildPeerDir(storageDir,
dataRegionId));
+ Assert.assertTrue(existingPeerDir.mkdirs());
+
+ consensusImpl.createLocalPeer(
+ dataRegionId,
+ Collections.singletonList(new Peer(dataRegionId, 1, new
TEndPoint("0.0.0.0", basePort))));
+
+ Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
+ consensusImpl.deleteLocalPeer(dataRegionId);
+ }
+
public void transferLeader() {
try {
consensusImpl.transferLeader(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 97b6d54fde5..2d0ee0dc6b2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -217,7 +217,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent)
pendingQueue.directPoll();
while (realtimeEvent != null) {
- final Event suppliedEvent;
+ Event suppliedEvent;
// Used to judge the type of the event, not directly for supplying.
final Event eventToSupply = realtimeEvent.getEvent();
@@ -241,6 +241,7 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
PipeRealtimeDataRegionHybridSource.class.getName(), false);
if (suppliedEvent != null) {
+ suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent,
suppliedEvent);
maySkipIndex4Event(realtimeEvent);
return suppliedEvent;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index 579310b3f15..3d9c81bcb0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -123,6 +123,7 @@ public class PipeRealtimeDataRegionLogSource extends
PipeRealtimeDataRegionSourc
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(),
false);
if (suppliedEvent != null) {
+ suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent,
suppliedEvent);
return suppliedEvent;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index da5b81c6e36..a1eebcbdbd5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -33,12 +33,16 @@ import
org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.consensus.pipe.IoTConsensusV2;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
@@ -463,6 +467,36 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
}
}
+ protected Event assignReplicateIndexIfNeeded(
+ final PipeRealtimeEvent realtimeEvent, final Event suppliedEvent) {
+ if (!(suppliedEvent instanceof EnrichedEvent) ||
!shouldAssignReplicateIndex(suppliedEvent)) {
+ return suppliedEvent;
+ }
+
+ final EnrichedEvent enrichedEvent = (EnrichedEvent) suppliedEvent;
+ if (enrichedEvent.getReplicateIndexForIoTV2() !=
EnrichedEvent.NO_COMMIT_ID) {
+ return suppliedEvent;
+ }
+
+
enrichedEvent.setReplicateIndexForIoTV2(assignReplicateIndexForRealtimeEvent());
+ LOGGER.debug(
+ "[{}]Set {} for realtime event {}",
+ pipeName,
+ enrichedEvent.getReplicateIndexForIoTV2(),
+ realtimeEvent.coreReportMessage());
+ return suppliedEvent;
+ }
+
+ protected boolean shouldAssignReplicateIndex(final Event suppliedEvent) {
+ return !(suppliedEvent instanceof ProgressReportEvent)
+ && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+ && IoTConsensusV2Processor.isShouldReplicate((EnrichedEvent)
suppliedEvent);
+ }
+
+ protected long assignReplicateIndexForRealtimeEvent() {
+ return
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName);
+ }
+
protected Event supplyHeartbeat(final PipeRealtimeEvent event) {
if
(event.increaseReferenceCount(PipeRealtimeDataRegionSource.class.getName())) {
return event.getEvent();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 97c3138de7c..d70d93db548 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -114,6 +114,7 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
PipeRealtimeDataRegionTsFileSource.class.getName(), false);
if (suppliedEvent != null) {
+ suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent,
suppliedEvent);
maySkipIndex4Event(realtimeEvent);
return suppliedEvent;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index f40de994c31..9c356e8654b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,9 +24,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.consensus.pipe.IoTConsensusV2;
-import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -36,7 +33,6 @@ import
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
-import
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
import
org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher.PipeDataRegionMatcher;
@@ -174,18 +170,6 @@ public class PipeDataRegionAssigner implements Closeable {
source.getRealtimeDataExtractionStartTime(),
source.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
- // if using IoTV2, assign a replicateIndex for this realtime
event
- if (DataRegionConsensusImpl.getInstance() instanceof
IoTConsensusV2
- && IoTConsensusV2Processor.isShouldReplicate(innerEvent)) {
- innerEvent.setReplicateIndexForIoTV2(
-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
- source.getPipeName()));
- LOGGER.debug(
- "[{}]Set {} for realtime event {}",
- source.getPipeName(),
- innerEvent.getReplicateIndexForIoTV2(),
- innerEvent);
- }
if (innerEvent instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java
new file mode 100644
index 00000000000..b58886afb3b
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PipeRealtimeReplicateIndexAssignmentTest {
+
+ @Test
+ public void assignReplicateIndexShouldBeLazyAndIdempotent() {
+ final TestPipeRealtimeDataRegionSource source = new
TestPipeRealtimeDataRegionSource();
+ final PipeDeleteDataNodeEvent event = new PipeDeleteDataNodeEvent();
+
+ Assert.assertEquals(EnrichedEvent.NO_COMMIT_ID,
event.getReplicateIndexForIoTV2());
+
+ final Event suppliedEvent = source.assign(event);
+ Assert.assertSame(event, suppliedEvent);
+ Assert.assertEquals(1L, event.getReplicateIndexForIoTV2());
+ Assert.assertEquals(1L, source.assignedCount.get());
+
+ source.assign(event);
+ Assert.assertEquals(1L, event.getReplicateIndexForIoTV2());
+ Assert.assertEquals(1L, source.assignedCount.get());
+ }
+
+ private static class TestPipeRealtimeDataRegionSource extends
PipeRealtimeDataRegionLogSource {
+ private final AtomicLong nextReplicateIndex = new AtomicLong(1);
+ private final AtomicLong assignedCount = new AtomicLong(0);
+
+ private Event assign(final PipeDeleteDataNodeEvent event) {
+ final TsFileResource resource = mock(TsFileResource.class);
+ when(resource.getTsFilePath()).thenReturn("target/test.tsfile");
+ final PipeRealtimeEvent realtimeEvent =
+ new PipeRealtimeEvent(event, new TsFileEpoch(resource), null);
+ return assignReplicateIndexIfNeeded(realtimeEvent, event);
+ }
+
+ @Override
+ protected boolean shouldAssignReplicateIndex(final Event suppliedEvent) {
+ return true;
+ }
+
+ @Override
+ protected long assignReplicateIndexForRealtimeEvent() {
+ assignedCount.incrementAndGet();
+ return nextReplicateIndex.getAndIncrement();
+ }
+ }
+}