This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.8 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 0011eb7b6d95e928831a5d7abbf89df1188f0216 Author: Peng Junzhi <[email protected]> AuthorDate: Fri Mar 20 05:55:32 2026 -0500 fix: pick deletion event for historical resend (#17329) --- .../sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java | 10 +++++++++- .../PipeHistoricalDataRegionTsFileAndDeletionSource.java | 12 ++++++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java index a481f7cde0d..a5598f87d84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/pipeconsensus/PipeConsensusAsyncSink.java @@ -236,7 +236,15 @@ public class PipeConsensusAsyncSink extends IoTDBSink implements ConsensusPipeSi while (!current.equalsInPipeConsensus(event) && iterator.hasNext()) { current = iterator.next(); } - iterator.remove(); + if (current.equalsInIoTConsensusV2(event)) { + iterator.remove(); + } else { + LOGGER.warn( + "IoTConsensusV2-ConsensusGroup-{}: event-{} not found in transferBuffer, skip removing. queue size = {}", + consensusGroupId, + event, + transferBuffer.size()); + } // update replicate progress currentReplicateProgress = Math.max(currentReplicateProgress, event.getReplicateIndexForIoTV2()); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index ab74d094740..cc59f885686 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -929,6 +929,18 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource skipIfNoPrivileges, false); + // if using IoTV2, assign a replicateIndex for this historical deletion event + if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 + && IoTConsensusV2Processor.isShouldReplicate(event)) { + event.setReplicateIndexForIoTV2( + ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName)); + LOGGER.debug( + "[{}]Set {} for historical deletion event {}", + pipeName, + event.getReplicateIndexForIoTV2(), + event); + } + if (sloppyPattern || isDbNameCoveredByPattern) { event.skipParsingPattern(); }
