This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new b8fd08984cb IoTV2: Refactor replicate index so that it is shared at
the pipe task level & Add some log for delete local peer (#15815)
b8fd08984cb is described below
commit b8fd08984cb2e87aabbd98337c3ada32fbb9cfd1
Author: Peng Junzhi <[email protected]>
AuthorDate: Thu Jun 26 09:33:48 2025 +0800
IoTV2: Refactor replicate index so that it is shared at the pipe task level
& Add some log for delete local peer (#15815)
* use custom replicate index for each consensus pipe
* test conf
* fix
* Revert "test conf"
This reverts commit f0f13af4f2e7edd55d561fafb6a01bf5048171dd.
* fix review
---
.../apache/iotdb/consensus/pipe/PipeConsensus.java | 3 +-
.../pipe/consensuspipe/ConsensusPipeConnector.java | 4 +-
.../consensuspipe/ReplicateProgressManager.java | 2 +-
.../pipe/metric/PipeConsensusSyncLagManager.java | 12 +++---
.../connector/PipeConnectorSubtaskManager.java | 4 ++
.../pipeconsensus/PipeConsensusAsyncConnector.java | 13 ++----
.../ReplicateProgressDataNodeManager.java | 21 ++++++---
.../event/realtime/PipeRealtimeEventFactory.java | 50 +---------------------
...oricalDataRegionTsFileAndDeletionExtractor.java | 8 +---
.../realtime/assigner/PipeDataRegionAssigner.java | 18 +++++++-
.../listener/PipeInsertionDataNodeListener.java | 11 ++---
.../task/progress/PipeEventCommitManager.java | 11 -----
12 files changed, 55 insertions(+), 102 deletions(-)
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
index e7a10d3b9d3..56792236c23 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/PipeConsensus.java
@@ -375,13 +375,14 @@ public class PipeConsensus implements IConsensus {
if (!stateMachineMap.containsKey(groupId)) {
throw new ConsensusGroupNotExistException(groupId);
}
-
+ LOGGER.info("[{}] start to delete local peer for group {}",
CLASS_NAME, groupId);
final PipeConsensusServerImpl consensus = stateMachineMap.get(groupId);
consensus.clear();
stateMachineMap.remove(groupId);
FileUtils.deleteFileOrDirectory(new File(getPeerDir(groupId)));
KillPoint.setKillPoint(IoTConsensusDeleteLocalPeerKillPoints.AFTER_DELETE);
+ LOGGER.info("[{}] finish deleting local peer for group {}",
CLASS_NAME, groupId);
} finally {
stateMachineMapLock.readLock().unlock();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
index 6f1396db972..bf4cd02b2ec 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ConsensusPipeConnector.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.consensus.pipe.consensuspipe;
public interface ConsensusPipeConnector {
- long getConsensusPipeCommitProgress();
+ long getLeaderReplicateProgress();
- long getConsensusPipeReplicateProgress();
+ long getFollowerApplyProgress();
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
index 13a18132272..9ae2964e89d 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/consensuspipe/ReplicateProgressManager.java
@@ -32,6 +32,6 @@ public interface ReplicateProgressManager {
long getSyncLagForSpecificConsensusPipe(
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
- void pinCommitIndexForMigration(
+ void pinReplicateIndexForRegionMigration(
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName);
}
diff --git
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
index 8f6cb651ab6..0539bece918 100644
---
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
+++
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/pipe/metric/PipeConsensusSyncLagManager.java
@@ -48,9 +48,7 @@ public class PipeConsensusSyncLagManager {
return
Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
.map(
consensusPipeConnector ->
- Math.max(
- pinnedCommitIndex -
consensusPipeConnector.getConsensusPipeReplicateProgress(),
- 0L))
+ Math.max(pinnedCommitIndex -
consensusPipeConnector.getFollowerApplyProgress(), 0L))
.orElse(0L);
}
@@ -62,16 +60,16 @@ public class PipeConsensusSyncLagManager {
return
Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
.map(
consensusPipeConnector -> {
- long userWriteProgress =
consensusPipeConnector.getConsensusPipeCommitProgress();
- long replicateProgress =
consensusPipeConnector.getConsensusPipeReplicateProgress();
+ long userWriteProgress =
consensusPipeConnector.getLeaderReplicateProgress();
+ long replicateProgress =
consensusPipeConnector.getFollowerApplyProgress();
return Math.max(userWriteProgress - replicateProgress, 0L);
})
.orElse(0L);
}
- public long getCurrentCommitIndex(ConsensusPipeName consensusPipeName) {
+ public long getCurrentLeaderReplicateIndex(ConsensusPipeName
consensusPipeName) {
return
Optional.ofNullable(consensusPipe2ConnectorMap.get(consensusPipeName))
- .map(ConsensusPipeConnector::getConsensusPipeCommitProgress)
+ .map(ConsensusPipeConnector::getLeaderReplicateProgress)
.orElse(0L);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
index 3a2cd2639f4..dd461e7f952 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/subtask/connector/PipeConnectorSubtaskManager.java
@@ -31,6 +31,7 @@ import
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeE
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.agent.task.execution.PipeConnectorSubtaskExecutor;
import
org.apache.iotdb.db.pipe.connector.protocol.thrift.async.IoTDBDataRegionAsyncConnector;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.pipe.api.PipeConnector;
@@ -201,6 +202,9 @@ public class PipeConnectorSubtaskManager {
}
PipeEventCommitManager.getInstance().deregister(pipeName, creationTime,
regionId);
+ // Reset IoTV2 replicate index to prevent index jumps. Do this when a
consensus pipe no longer
+ // replicates data, since extractor and processor are already dropped now.
+ ReplicateProgressDataNodeManager.resetReplicateIndexForIoTV2(pipeName);
}
public synchronized void start(final String attributeSortedString) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
index 6ed86376c5a..bf1dab239f9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/pipeconsensus/PipeConsensusAsyncConnector.java
@@ -28,7 +28,6 @@ import
org.apache.iotdb.commons.client.container.IoTV2GlobalComponentContainer;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
import
org.apache.iotdb.commons.exception.pipe.PipeRuntimeConnectorRetryTimesConfigurableException;
-import
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBConnector;
import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.service.metric.MetricService;
@@ -39,7 +38,6 @@ import org.apache.iotdb.consensus.pipe.thrift.TCommitId;
import org.apache.iotdb.consensus.pipe.thrift.TPipeConsensusTransferReq;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
-import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusDeleteEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletBatchEventHandler;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.handler.PipeConsensusTabletInsertNodeEventHandler;
@@ -48,6 +46,7 @@ import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.builder
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusDeleteNodeReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletBinaryReq;
import
org.apache.iotdb.db.pipe.connector.protocol.pipeconsensus.payload.request.PipeConsensusTabletInsertNodeReq;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.metric.PipeConsensusConnectorMetrics;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -726,16 +725,12 @@ public class PipeConsensusAsyncConnector extends
IoTDBConnector implements Conse
}
@Override
- public long getConsensusPipeCommitProgress() {
- return PipeEventCommitManager.getInstance()
- .getGivenConsensusPipeCommitId(
- consensusPipeName,
- PipeDataNodeAgent.task().getPipeCreationTime(consensusPipeName),
- consensusGroupId);
+ public long getLeaderReplicateProgress() {
+ return
ReplicateProgressDataNodeManager.getReplicateIndexForIoTV2(consensusPipeName);
}
@Override
- public long getConsensusPipeReplicateProgress() {
+ public long getFollowerApplyProgress() {
return currentReplicateProgress;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
index ca6153d804a..29347cfd5f1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/ReplicateProgressDataNodeManager.java
@@ -44,7 +44,8 @@ import java.util.stream.Collectors;
public class ReplicateProgressDataNodeManager implements
ReplicateProgressManager {
private static final int DATA_NODE_ID =
IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
- private static final Map<String, AtomicLong> groupId2ReplicateIndex = new
ConcurrentHashMap<>();
+ private static final Map<String, AtomicLong> consensusPipe2ReplicateIndex =
+ new ConcurrentHashMap<>();
private final Map<ConsensusGroupId, ProgressIndex> groupId2MaxProgressIndex;
private final Map<ConsensusPipeName, Long>
consensusPipe2pinnedCommitIndexForMigration;
@@ -55,12 +56,20 @@ public class ReplicateProgressDataNodeManager implements
ReplicateProgressManage
recoverMaxProgressIndexFromDataRegion();
}
- public static long assignReplicateIndexForIoTV2(String groupId) {
- return groupId2ReplicateIndex
- .compute(groupId, (k, v) -> v == null ? new AtomicLong(0) : v)
+ public static long assignReplicateIndexForIoTV2(String consensusPipeName) {
+ return consensusPipe2ReplicateIndex
+ .compute(consensusPipeName, (k, v) -> v == null ? new AtomicLong(0) :
v)
.incrementAndGet();
}
+ public static void resetReplicateIndexForIoTV2(String consensusPipeName) {
+ consensusPipe2ReplicateIndex.put(consensusPipeName, new AtomicLong(0));
+ }
+
+ public static long getReplicateIndexForIoTV2(String consensusPipeName) {
+ return consensusPipe2ReplicateIndex.getOrDefault(consensusPipeName, new
AtomicLong(0)).get();
+ }
+
public static ProgressIndex extractLocalSimpleProgressIndex(ProgressIndex
progressIndex) {
if (progressIndex instanceof RecoverProgressIndex) {
final Map<Integer, SimpleProgressIndex> dataNodeId2LocalIndex =
@@ -151,11 +160,11 @@ public class ReplicateProgressDataNodeManager implements
ReplicateProgressManage
}
@Override
- public void pinCommitIndexForMigration(
+ public void pinReplicateIndexForRegionMigration(
ConsensusGroupId consensusGroupId, ConsensusPipeName consensusPipeName) {
this.consensusPipe2pinnedCommitIndexForMigration.put(
consensusPipeName,
PipeConsensusSyncLagManager.getInstance(consensusGroupId.toString())
- .getCurrentCommitIndex(consensusPipeName));
+ .getCurrentLeaderReplicateIndex(consensusPipeName));
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
index fa8a4e00024..cde00ee38d0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEventFactory.java
@@ -20,15 +20,11 @@
package org.apache.iotdb.db.pipe.event.realtime;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
-import org.apache.iotdb.consensus.pipe.PipeConsensus;
-import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
-import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpochManager;
-import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.AbstractDeleteDataNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsNode;
@@ -36,18 +32,12 @@ import
org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.wal.utils.WALEntryHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.util.stream.Collectors;
public class PipeRealtimeEventFactory {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(PipeRealtimeEventFactory.class);
private static final TsFileEpochManager TS_FILE_EPOCH_MANAGER = new
TsFileEpochManager();
public static PipeRealtimeEvent createRealtimeEvent(
- final String dataRegionId,
final Boolean isTableModel,
final String databaseNameFromDataRegion,
final TsFileResource resource,
@@ -56,23 +46,10 @@ public class PipeRealtimeEventFactory {
new PipeTsFileInsertionEvent(
isTableModel, databaseNameFromDataRegion, resource, isLoaded,
false);
- // if using IoTV2, assign a replicateIndex for this event
- if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
- && PipeConsensusProcessor.isShouldReplicate(tsFileInsertionEvent)) {
- tsFileInsertionEvent.setReplicateIndexForIoTV2(
-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
- LOGGER.info(
- "[Region{}]Set {} for event {}",
- dataRegionId,
- tsFileInsertionEvent.getReplicateIndexForIoTV2(),
- tsFileInsertionEvent);
- }
-
return
TS_FILE_EPOCH_MANAGER.bindPipeTsFileInsertionEvent(tsFileInsertionEvent,
resource);
}
public static PipeRealtimeEvent createRealtimeEvent(
- final String dataRegionId,
final Boolean isTableModel,
final String databaseNameFromDataRegion,
final WALEntryHandler walEntryHandler,
@@ -98,18 +75,6 @@ public class PipeRealtimeEventFactory {
insertNode.isAligned(),
insertNode.isGeneratedByPipe());
- // if using IoTV2, assign a replicateIndex for this event
- if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
- && PipeConsensusProcessor.isShouldReplicate(insertionEvent)) {
- insertionEvent.setReplicateIndexForIoTV2(
-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
- LOGGER.info(
- "[Region{}]Set {} for event {}",
- dataRegionId,
- insertionEvent.getReplicateIndexForIoTV2(),
- insertionEvent);
- }
-
return TS_FILE_EPOCH_MANAGER.bindPipeInsertNodeTabletInsertionEvent(
insertionEvent, insertNode, resource);
}
@@ -120,23 +85,10 @@ public class PipeRealtimeEventFactory {
new PipeHeartbeatEvent(dataRegionId, shouldPrintMessage), null, null);
}
- public static PipeRealtimeEvent createRealtimeEvent(
- final String dataRegionId, final AbstractDeleteDataNode node) {
+ public static PipeRealtimeEvent createRealtimeEvent(final
AbstractDeleteDataNode node) {
PipeDeleteDataNodeEvent deleteDataNodeEvent =
new PipeDeleteDataNodeEvent(node, node.isGeneratedByPipe());
- // if using IoTV2, assign a replicateIndex for this event
- if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
- && PipeConsensusProcessor.isShouldReplicate(deleteDataNodeEvent)) {
- deleteDataNodeEvent.setReplicateIndexForIoTV2(
-
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(dataRegionId));
- LOGGER.info(
- "[Region{}]Set {} for event {}",
- dataRegionId,
- deleteDataNodeEvent.getReplicateIndexForIoTV2(),
- deleteDataNodeEvent);
- }
-
return new PipeRealtimeEvent(deleteDataNodeEvent, null, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
index dc9b7318acc..79e20880689 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionExtractor.java
@@ -877,13 +877,9 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionExtractor
if (DataRegionConsensusImpl.getInstance() instanceof PipeConsensus
&& PipeConsensusProcessor.isShouldReplicate(event)) {
event.setReplicateIndexForIoTV2(
- ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
- resource.getDataRegionId()));
+
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(pipeName));
LOGGER.info(
- "[Region{}]Set {} for event {}",
- resource.getDataRegionId(),
- event.getReplicateIndexForIoTV2(),
- event);
+ "[{}]Set {} for historical event {}", pipeName,
event.getReplicateIndexForIoTV2(), event);
}
if (sloppyPattern || isDbNameCoveredByPattern) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index c76652aa0e2..060b395e51a 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -27,6 +27,9 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.consensus.pipe.PipeConsensus;
+import org.apache.iotdb.db.consensus.DataRegionConsensusImpl;
+import org.apache.iotdb.db.pipe.consensus.ReplicateProgressDataNodeManager;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
import org.apache.iotdb.db.pipe.event.common.deletion.PipeDeleteDataNodeEvent;
@@ -40,6 +43,7 @@ import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.CachedSche
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.matcher.PipeDataRegionMatcher;
import org.apache.iotdb.db.pipe.metric.source.PipeAssignerMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionEventCounter;
+import org.apache.iotdb.db.pipe.processor.pipeconsensus.PipeConsensusProcessor;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -191,8 +195,18 @@ public class PipeDataRegionAssigner implements Closeable {
extractor.getRealtimeDataExtractionStartTime(),
extractor.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
- // Bind replicateIndex for IoTV2
-
innerEvent.setReplicateIndexForIoTV2(event.getEvent().getReplicateIndexForIoTV2());
+ // if using IoTV2, assign a replicateIndex for this realtime
event
+ if (DataRegionConsensusImpl.getInstance() instanceof
PipeConsensus
+ && PipeConsensusProcessor.isShouldReplicate(innerEvent)) {
+ innerEvent.setReplicateIndexForIoTV2(
+
ReplicateProgressDataNodeManager.assignReplicateIndexForIoTV2(
+ extractor.getPipeName()));
+ LOGGER.info(
+ "[{}]Set {} for realtime event {}",
+ extractor.getPipeName(),
+ innerEvent.getReplicateIndexForIoTV2(),
+ innerEvent);
+ }
if (innerEvent instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index f1f8c22ac51..90d378ecf7d 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -116,7 +116,7 @@ public class PipeInsertionDataNodeListener {
assigner.publishToAssign(
PipeRealtimeEventFactory.createRealtimeEvent(
- dataRegionId, assigner.isTableModel(), databaseName,
tsFileResource, isLoaded));
+ assigner.isTableModel(), databaseName, tsFileResource, isLoaded));
}
public void listenToInsertNode(
@@ -138,12 +138,7 @@ public class PipeInsertionDataNodeListener {
assigner.publishToAssign(
PipeRealtimeEventFactory.createRealtimeEvent(
- dataRegionId,
- assigner.isTableModel(),
- databaseName,
- walEntryHandler,
- insertNode,
- tsFileResource));
+ assigner.isTableModel(), databaseName, walEntryHandler,
insertNode, tsFileResource));
}
public DeletionResource listenToDeleteData(
@@ -169,7 +164,7 @@ public class PipeInsertionDataNodeListener {
deletionResource = null;
}
-
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(regionId,
node));
+
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node));
return deletionResource;
}
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
index b37bd07d1d9..31ee7bfeced 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/task/progress/PipeEventCommitManager.java
@@ -176,17 +176,6 @@ public class PipeEventCommitManager {
this.commitRateMarker = commitRateMarker;
}
- public long getGivenConsensusPipeCommitId(
- final String consensusPipeName, final long creationTime, final int
consensusGroupId) {
- final CommitterKey committerKey =
- generateCommitterKey(consensusPipeName, creationTime,
consensusGroupId);
- final PipeEventCommitter committer = eventCommitterMap.get(committerKey);
- if (committer == null) {
- return 0;
- }
- return committer.getCurrentCommitId();
- }
-
//////////////////////////// singleton ////////////////////////////
private PipeEventCommitManager() {