This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new b15f706589a Fix region migration reliability regressions (#17513)
b15f706589a is described below

commit b15f706589a85864d24a7b8ff76255b2c15cde6e
Author: Peng Junzhi <[email protected]>
AuthorDate: Wed Apr 29 11:31:40 2026 +0800

    Fix region migration reliability regressions (#17513)
    
    * Fix region migration reliability regressions
    
    * spotless
    
    * Address consensus pipe test review
---
 .../procedure/env/RegionMaintainHandler.java       | 37 +++++++---
 .../RegionMaintainHandlerConsensusPipeTest.java    | 64 +++++++++++++++++-
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |  2 +-
 .../apache/iotdb/consensus/iot/StabilityTest.java  | 13 ++++
 .../PipeRealtimeDataRegionHybridSource.java        |  3 +-
 .../realtime/PipeRealtimeDataRegionLogSource.java  |  1 +
 .../realtime/PipeRealtimeDataRegionSource.java     | 34 ++++++++++
 .../PipeRealtimeDataRegionTsFileSource.java        |  1 +
 .../realtime/assigner/PipeDataRegionAssigner.java  | 16 -----
 .../PipeRealtimeReplicateIndexAssignmentTest.java  | 79 ++++++++++++++++++++++
 10 files changed, 222 insertions(+), 28 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
index 5043becc1fa..47105dda1c5 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandler.java
@@ -299,15 +299,22 @@ public class RegionMaintainHandler {
     TMaintainPeerReq maintainPeerReq =
         new TMaintainPeerReq(regionId, originalDataNode, procedureId);
 
-    // Always use full retries regardless of node status, because after a 
cluster crash the
-    // target DataNode may be Unknown but still in the process of restarting.
+    final NodeStatus nodeStatus = 
getDataNodeStatus(originalDataNode.getDataNodeId());
+    final boolean useFullRetry = !NodeStatus.Unknown.equals(nodeStatus);
+    if (!useFullRetry) {
+      LOGGER.info(
+          "{}, DataNode {} is {}, submit DELETE_OLD_REGION_PEER with a single 
RPC attempt and let RemoveRegionPeerProcedure handle retries.",
+          REGION_MIGRATE_PROCESS,
+          simplifiedLocation(originalDataNode),
+          nodeStatus);
+    }
+
     status =
-        (TSStatus)
-            SyncDataNodeClientPool.getInstance()
-                .sendSyncRequestToDataNodeWithRetry(
-                    originalDataNode.getInternalEndPoint(),
-                    maintainPeerReq,
-                    CnToDnSyncRequestType.DELETE_OLD_REGION_PEER);
+        submitDataNodeSyncRequest(
+            originalDataNode.getInternalEndPoint(),
+            maintainPeerReq,
+            CnToDnSyncRequestType.DELETE_OLD_REGION_PEER,
+            useFullRetry);
     LOGGER.info(
         "{}, Send action deleteOldRegionPeer finished, regionId: {}, 
dataNodeId: {}",
         REGION_MIGRATE_PROCESS,
@@ -316,6 +323,20 @@ public class RegionMaintainHandler {
     return status;
   }
 
+  protected NodeStatus getDataNodeStatus(int dataNodeId) {
+    return configManager.getLoadManager().getNodeStatus(dataNodeId);
+  }
+
+  protected TSStatus submitDataNodeSyncRequest(
+      TEndPoint endPoint, Object request, CnToDnSyncRequestType requestType, 
boolean useFullRetry) {
+    return (TSStatus)
+        (useFullRetry
+            ? SyncDataNodeClientPool.getInstance()
+                .sendSyncRequestToDataNodeWithRetry(endPoint, request, 
requestType)
+            : SyncDataNodeClientPool.getInstance()
+                .sendSyncRequestToDataNodeWithGivenRetry(endPoint, request, 
requestType, 1));
+  }
+
   public Map<Integer, TSStatus> resetPeerList(
       TConsensusGroupId regionId,
       List<TDataNodeLocation> correctDataNodeLocations,
diff --git 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
index b384b9e0f0a..a99d6f52fb8 100644
--- 
a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
+++ 
b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/env/RegionMaintainHandlerConsensusPipeTest.java
@@ -24,11 +24,15 @@ import 
org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.cluster.NodeStatus;
 import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.pipe.agent.task.meta.PipeStatus;
+import org.apache.iotdb.confignode.client.sync.CnToDnSyncRequestType;
 import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
 import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.manager.ProcedureManager;
+import org.apache.iotdb.confignode.manager.load.LoadManager;
 import org.apache.iotdb.confignode.manager.partition.PartitionManager;
 import org.apache.iotdb.confignode.manager.pipe.coordinator.PipeManager;
 import 
org.apache.iotdb.confignode.manager.pipe.coordinator.task.PipeTaskCoordinator;
@@ -45,6 +49,9 @@ import java.util.HashMap;
 import java.util.Map;
 
 import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS_V2;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -59,7 +66,8 @@ public class RegionMaintainHandlerConsensusPipeTest {
   private PipeManager pipeManager;
   private PipeTaskCoordinator pipeTaskCoordinator;
   private ProcedureManager procedureManager;
-  private RegionMaintainHandler handler;
+  private LoadManager loadManager;
+  private TestRegionMaintainHandler handler;
 
   private String originalConsensusProtocol;
 
@@ -76,13 +84,15 @@ public class RegionMaintainHandlerConsensusPipeTest {
     pipeManager = mock(PipeManager.class);
     pipeTaskCoordinator = mock(PipeTaskCoordinator.class);
     procedureManager = mock(ProcedureManager.class);
+    loadManager = mock(LoadManager.class);
 
     when(configManager.getPartitionManager()).thenReturn(partitionManager);
     when(configManager.getPipeManager()).thenReturn(pipeManager);
     when(configManager.getProcedureManager()).thenReturn(procedureManager);
+    when(configManager.getLoadManager()).thenReturn(loadManager);
     when(pipeManager.getPipeTaskCoordinator()).thenReturn(pipeTaskCoordinator);
 
-    handler = new RegionMaintainHandler(configManager);
+    handler = new TestRegionMaintainHandler(configManager);
   }
 
   @After
@@ -254,6 +264,34 @@ public class RegionMaintainHandlerConsensusPipeTest {
     verify(procedureManager, times(1)).startConsensusPipe(pipe2to1);
   }
 
+  @Test
+  public void testDeleteOldRegionPeerUsesSingleAttemptWhenNodeUnknown() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    
when(loadManager.getNodeStatus(loc1.getDataNodeId())).thenReturn(NodeStatus.Unknown);
+
+    handler.submitDeleteOldRegionPeerTask(
+        1L, loc1, new TConsensusGroupId(TConsensusGroupType.DataRegion, 100));
+
+    verify(loadManager, times(1)).getNodeStatus(loc1.getDataNodeId());
+    assertFalse(handler.lastUseFullRetry);
+    assertEquals(CnToDnSyncRequestType.DELETE_OLD_REGION_PEER, 
handler.lastRequestType);
+    assertEquals(loc1.getInternalEndPoint(), handler.lastEndPoint);
+  }
+
+  @Test
+  public void testDeleteOldRegionPeerKeepsFullRetryWhenNodeRunning() {
+    TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
+    
when(loadManager.getNodeStatus(loc1.getDataNodeId())).thenReturn(NodeStatus.Running);
+
+    handler.submitDeleteOldRegionPeerTask(
+        1L, loc1, new TConsensusGroupId(TConsensusGroupType.DataRegion, 100));
+
+    verify(loadManager, times(1)).getNodeStatus(loc1.getDataNodeId());
+    assertTrue(handler.lastUseFullRetry);
+    assertEquals(CnToDnSyncRequestType.DELETE_OLD_REGION_PEER, 
handler.lastRequestType);
+    assertEquals(loc1.getInternalEndPoint(), handler.lastEndPoint);
+  }
+
   @Test
   public void testThreeNodeReplicaSetCreatesAllSixPipes() {
     TDataNodeLocation loc1 = makeLocation(1, "127.0.0.1", 40010);
@@ -287,4 +325,26 @@ public class RegionMaintainHandlerConsensusPipeTest {
     verify(procedureManager, never()).dropConsensusPipeAsync(any());
     verify(procedureManager, never()).startConsensusPipe(any());
   }
+
+  private static class TestRegionMaintainHandler extends RegionMaintainHandler 
{
+    private boolean lastUseFullRetry;
+    private TEndPoint lastEndPoint;
+    private CnToDnSyncRequestType lastRequestType;
+
+    private TestRegionMaintainHandler(ConfigManager configManager) {
+      super(configManager);
+    }
+
+    @Override
+    protected TSStatus submitDataNodeSyncRequest(
+        TEndPoint endPoint,
+        Object request,
+        CnToDnSyncRequestType requestType,
+        boolean useFullRetry) {
+      lastEndPoint = endPoint;
+      lastRequestType = requestType;
+      lastUseFullRetry = useFullRetry;
+      return new TSStatus(200);
+    }
+  }
 }
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index a3410217868..256b918c02c 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -273,7 +273,7 @@ public class IoTConsensus implements IConsensus {
 
                   String path = buildPeerDir(storageDir, groupId);
                   File file = new File(path);
-                  if (!file.mkdirs()) {
+                  if (!file.exists() && !file.mkdirs()) {
                     logger.warn("Unable to create consensus dir for group {} 
at {}", groupId, path);
                     return null;
                   }
diff --git 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
index 5147632431f..1351b674312 100644
--- 
a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
+++ 
b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/StabilityTest.java
@@ -186,6 +186,19 @@ public class StabilityTest {
     consensusImpl.deleteLocalPeer(dataRegionId);
   }
 
+  @Test
+  public void createLocalPeerShouldAllowExistingConsensusDir() throws 
Exception {
+    File existingPeerDir = new File(IoTConsensus.buildPeerDir(storageDir, 
dataRegionId));
+    Assert.assertTrue(existingPeerDir.mkdirs());
+
+    consensusImpl.createLocalPeer(
+        dataRegionId,
+        Collections.singletonList(new Peer(dataRegionId, 1, new 
TEndPoint("0.0.0.0", basePort))));
+
+    Assert.assertEquals(1, consensusImpl.getReplicationNum(dataRegionId));
+    consensusImpl.deleteLocalPeer(dataRegionId);
+  }
+
   public void transferLeader() {
     try {
       consensusImpl.transferLeader(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index 97b6d54fde5..2d0ee0dc6b2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -217,7 +217,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
     PipeRealtimeEvent realtimeEvent = (PipeRealtimeEvent) 
pendingQueue.directPoll();
 
     while (realtimeEvent != null) {
-      final Event suppliedEvent;
+      Event suppliedEvent;
 
       // Used to judge the type of the event, not directly for supplying.
       final Event eventToSupply = realtimeEvent.getEvent();
@@ -241,6 +241,7 @@ public class PipeRealtimeDataRegionHybridSource extends 
PipeRealtimeDataRegionSo
           PipeRealtimeDataRegionHybridSource.class.getName(), false);
 
       if (suppliedEvent != null) {
+        suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent, 
suppliedEvent);
         maySkipIndex4Event(realtimeEvent);
         return suppliedEvent;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
index 579310b3f15..3d9c81bcb0c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionLogSource.java
@@ -123,6 +123,7 @@ public class PipeRealtimeDataRegionLogSource extends 
PipeRealtimeDataRegionSourc
       
realtimeEvent.decreaseReferenceCount(PipeRealtimeDataRegionLogSource.class.getName(),
 false);
 
       if (suppliedEvent != null) {
+        suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent, 
suppliedEvent);
         return suppliedEvent;
       }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index da5b81c6e36..a1eebcbdbd5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -33,12 +33,16 @@ import 
org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.queryengine.utils.DateTimeUtils;
 import org.apache.iotdb.commons.utils.PathUtils;
 import org.apache.iotdb.commons.utils.TimePartitionUtils;
+import org.apache.iotdb.consensus.pipe.IoTConsensusV2;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
 import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import 
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
 import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
@@ -463,6 +467,36 @@ public abstract class PipeRealtimeDataRegionSource 
implements PipeExtractor {
     }
   }
 
+  protected Event assignReplicateIndexIfNeeded(
+      final PipeRealtimeEvent realtimeEvent, final Event suppliedEvent) {
+    if (!(suppliedEvent instanceof EnrichedEvent) || 
!shouldAssignReplicateIndex(suppliedEvent)) {
+      return suppliedEvent;
+    }
+
+    final EnrichedEvent enrichedEvent = (EnrichedEvent) suppliedEvent;
+    if (enrichedEvent.getReplicateIndexForIoTV2() != 
EnrichedEvent.NO_COMMIT_ID) {
+      return suppliedEvent;
+    }
+
+    
enrichedEvent.setReplicateIndexForIoTV2(assignReplicateIndexForRealtimeEvent());
+    LOGGER.debug(
+        "[{}]Set {} for realtime event {}",
+        pipeName,
+        enrichedEvent.getReplicateIndexForIoTV2(),
+        realtimeEvent.coreReportMessage());
+    return suppliedEvent;
+  }
+
+  protected boolean shouldAssignReplicateIndex(final Event suppliedEvent) {
+    return !(suppliedEvent instanceof ProgressReportEvent)
+        && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+        && IoTConsensusV2Processor.isShouldReplicate((EnrichedEvent) 
suppliedEvent);
+  }
+
+  protected long assignReplicateIndexForRealtimeEvent() {
+    return 
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName);
+  }
+
   protected Event supplyHeartbeat(final PipeRealtimeEvent event) {
     if 
(event.increaseReferenceCount(PipeRealtimeDataRegionSource.class.getName())) {
       return event.getEvent();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 97c3138de7c..d70d93db548 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -114,6 +114,7 @@ public class PipeRealtimeDataRegionTsFileSource extends 
PipeRealtimeDataRegionSo
           PipeRealtimeDataRegionTsFileSource.class.getName(), false);
 
       if (suppliedEvent != null) {
+        suppliedEvent = assignReplicateIndexIfNeeded(realtimeEvent, 
suppliedEvent);
         maySkipIndex4Event(realtimeEvent);
         return suppliedEvent;
       }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index f40de994c31..9c356e8654b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -24,9 +24,6 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
 import org.apache.iotdb.commons.utils.PathUtils;
-import org.apache.iotdb.consensus.pipe.IoTConsensusV2;
-import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
 import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
 import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -36,7 +33,6 @@ import 
org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
 import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
 import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics;
 import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
-import 
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.PipeRealtimeDataRegionSource;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher.CachedSchemaPatternMatcher;
 import 
org.apache.iotdb.db.pipe.source.dataregion.realtime.matcher.PipeDataRegionMatcher;
@@ -174,18 +170,6 @@ public class PipeDataRegionAssigner implements Closeable {
                       source.getRealtimeDataExtractionStartTime(),
                       source.getRealtimeDataExtractionEndTime());
               final EnrichedEvent innerEvent = copiedEvent.getEvent();
-              // if using IoTV2, assign a replicateIndex for this realtime 
event
-              if (DataRegionConsensusImpl.getInstance() instanceof 
IoTConsensusV2
-                  && IoTConsensusV2Processor.isShouldReplicate(innerEvent)) {
-                innerEvent.setReplicateIndexForIoTV2(
-                    
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
-                        source.getPipeName()));
-                LOGGER.debug(
-                    "[{}]Set {} for realtime event {}",
-                    source.getPipeName(),
-                    innerEvent.getReplicateIndexForIoTV2(),
-                    innerEvent);
-              }
 
               if (innerEvent instanceof PipeTsFileInsertionEvent) {
                 final PipeTsFileInsertionEvent tsFileInsertionEvent =
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java
new file mode 100644
index 00000000000..b58886afb3b
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeReplicateIndexAssignmentTest.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime;
+
+import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
+import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
+import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
+import org.apache.iotdb.db.pipe.source.dataregion.realtime.epoch.TsFileEpoch;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class PipeRealtimeReplicateIndexAssignmentTest {
+
+  @Test
+  public void assignReplicateIndexShouldBeLazyAndIdempotent() {
+    final TestPipeRealtimeDataRegionSource source = new 
TestPipeRealtimeDataRegionSource();
+    final PipeDeleteDataNodeEvent event = new PipeDeleteDataNodeEvent();
+
+    Assert.assertEquals(EnrichedEvent.NO_COMMIT_ID, 
event.getReplicateIndexForIoTV2());
+
+    final Event suppliedEvent = source.assign(event);
+    Assert.assertSame(event, suppliedEvent);
+    Assert.assertEquals(1L, event.getReplicateIndexForIoTV2());
+    Assert.assertEquals(1L, source.assignedCount.get());
+
+    source.assign(event);
+    Assert.assertEquals(1L, event.getReplicateIndexForIoTV2());
+    Assert.assertEquals(1L, source.assignedCount.get());
+  }
+
+  private static class TestPipeRealtimeDataRegionSource extends 
PipeRealtimeDataRegionLogSource {
+    private final AtomicLong nextReplicateIndex = new AtomicLong(1);
+    private final AtomicLong assignedCount = new AtomicLong(0);
+
+    private Event assign(final PipeDeleteDataNodeEvent event) {
+      final TsFileResource resource = mock(TsFileResource.class);
+      when(resource.getTsFilePath()).thenReturn("target/test.tsfile");
+      final PipeRealtimeEvent realtimeEvent =
+          new PipeRealtimeEvent(event, new TsFileEpoch(resource), null);
+      return assignReplicateIndexIfNeeded(realtimeEvent, event);
+    }
+
+    @Override
+    protected boolean shouldAssignReplicateIndex(final Event suppliedEvent) {
+      return true;
+    }
+
+    @Override
+    protected long assignReplicateIndexForRealtimeEvent() {
+      assignedCount.incrementAndGet();
+      return nextReplicateIndex.getAndIncrement();
+    }
+  }
+}

Reply via email to