This is an automated email from the ASF dual-hosted git repository.
rong pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2c6ad830d0d Pipe: Fixed the bug that the insertion for newer tsFile in
one region may report progress beyond the older tsFile when it is not flushed
(#15515) (#15596)
2c6ad830d0d is described below
commit 2c6ad830d0dee5dbef679d1fb107596c231fa5eb
Author: Caideyipi <[email protected]>
AuthorDate: Thu May 29 18:15:13 2025 +0800
Pipe: Fixed the bug that the insertion for newer tsFile in one region may
report progress beyond the older tsFile when it is not flushed (#15515) (#15596)
---
.../common/tsfile/PipeTsFileInsertionEvent.java | 9 ++--
.../db/pipe/event/realtime/PipeRealtimeEvent.java | 5 ++
.../PipeRealtimeDataRegionHybridExtractor.java | 3 ++
.../realtime/assigner/PipeDataRegionAssigner.java | 34 ++++++++++----
...ava => PipeTsFileEpochProgressIndexKeeper.java} | 53 ++++++++--------------
.../dataregion/realtime/epoch/TsFileEpoch.java | 14 ++++--
.../dataregion/tsfile/TsFileResource.java | 10 ++--
7 files changed, 67 insertions(+), 61 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 20dee1bf567..b648af30e6f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -30,7 +30,7 @@ import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainer;
import
org.apache.iotdb.db.pipe.event.common.tsfile.container.TsFileInsertionDataContainerProvider;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryManager;
@@ -342,11 +342,8 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
public void eliminateProgressIndex() {
if (Objects.isNull(overridingProgressIndex)) {
- PipeTimePartitionProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(
- resource.getDataRegionId(),
- resource.getTimePartition(),
- resource.getMaxProgressIndexAfterClose());
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .eliminateProgressIndex(resource.getDataRegionId(),
resource.getTsFilePath());
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
index 3818d0ac53b..d4a39193467 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/realtime/PipeRealtimeEvent.java
@@ -130,6 +130,11 @@ public class PipeRealtimeEvent extends EnrichedEvent {
return event.internallyDecreaseResourceReferenceCount(holderMessage);
}
+ @Override
+ public void bindProgressIndex(final ProgressIndex progressIndex) {
+ event.bindProgressIndex(progressIndex);
+ }
+
@Override
public ProgressIndex getProgressIndex() {
return event.getProgressIndex();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
index a4cebff2806..825dfd11345 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/PipeRealtimeDataRegionHybridExtractor.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEve
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.extractor.dataregion.IoTDBDataRegionExtractor;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.epoch.TsFileEpoch;
import
org.apache.iotdb.db.pipe.metric.overview.PipeDataNodeRemainingEventAndTimeMetrics;
import org.apache.iotdb.db.pipe.metric.source.PipeDataRegionExtractorMetrics;
@@ -471,6 +472,8 @@ public class PipeRealtimeDataRegionHybridExtractor extends
PipeRealtimeDataRegio
switch (state) {
case USING_TSFILE:
// If the state is USING_TSFILE, discard the event and poll the next
one.
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .eliminateProgressIndex(dataRegionId,
event.getTsFileEpoch().getFilePath());
return null;
case EMPTY:
case USING_TABLET:
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 401cf9d4eb6..ed91f636ac1 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -26,6 +26,7 @@ import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
import org.apache.iotdb.commons.pipe.metric.PipeEventCounter;
import org.apache.iotdb.db.pipe.event.common.heartbeat.PipeHeartbeatEvent;
+import
org.apache.iotdb.db.pipe.event.common.tablet.PipeInsertNodeTabletInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEvent;
import org.apache.iotdb.db.pipe.event.realtime.PipeRealtimeEventFactory;
@@ -61,7 +62,7 @@ public class PipeDataRegionAssigner implements Closeable {
private int counter = 0;
- private final AtomicReference<ProgressIndex>
maxProgressIndexForTsFileInsertionEvent =
+ private final AtomicReference<ProgressIndex>
maxProgressIndexForRealtimeEvent =
new AtomicReference<>(MinimumProgressIndex.INSTANCE);
private final PipeEventCounter eventCounter = new
PipeDataRegionEventCounter();
@@ -170,7 +171,11 @@ public class PipeDataRegionAssigner implements Closeable {
(PipeTsFileInsertionEvent) innerEvent;
tsFileInsertionEvent.disableMod4NonTransferPipes(
extractor.isShouldTransferModFile());
-
bindOrUpdateProgressIndexForTsFileInsertionEvent(tsFileInsertionEvent);
+ }
+
+ if (innerEvent instanceof PipeTsFileInsertionEvent
+ || innerEvent instanceof PipeInsertNodeTabletInsertionEvent)
{
+ bindOrUpdateProgressIndexForRealtimeEvent(copiedEvent);
}
if
(!copiedEvent.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
@@ -183,25 +188,34 @@ public class PipeDataRegionAssigner implements Closeable {
});
}
- private void bindOrUpdateProgressIndexForTsFileInsertionEvent(
- final PipeTsFileInsertionEvent event) {
- if (PipeTimePartitionProgressIndexKeeper.getInstance()
+ private void bindOrUpdateProgressIndexForRealtimeEvent(final
PipeRealtimeEvent event) {
+ if (PipeTsFileEpochProgressIndexKeeper.getInstance()
.isProgressIndexAfterOrEquals(
- dataRegionId, event.getTimePartitionId(),
event.forceGetProgressIndex())) {
- event.bindProgressIndex(maxProgressIndexForTsFileInsertionEvent.get());
+ dataRegionId,
+ event.getTsFileEpoch().getFilePath(),
+ getProgressIndex4RealtimeEvent(event))) {
+ event.bindProgressIndex(maxProgressIndexForRealtimeEvent.get());
if (LOGGER.isDebugEnabled()) {
LOGGER.debug(
"Data region {} bind {} to event {} because it was flushed
prematurely.",
dataRegionId,
- maxProgressIndexForTsFileInsertionEvent,
+ maxProgressIndexForRealtimeEvent,
event.coreReportMessage());
}
} else {
- maxProgressIndexForTsFileInsertionEvent.updateAndGet(
- index ->
index.updateToMinimumEqualOrIsAfterProgressIndex(event.forceGetProgressIndex()));
+ maxProgressIndexForRealtimeEvent.updateAndGet(
+ index ->
+ index.updateToMinimumEqualOrIsAfterProgressIndex(
+ getProgressIndex4RealtimeEvent(event)));
}
}
+ private ProgressIndex getProgressIndex4RealtimeEvent(final PipeRealtimeEvent
event) {
+ return event.getEvent() instanceof PipeTsFileInsertionEvent
+ ? ((PipeTsFileInsertionEvent) event.getEvent()).forceGetProgressIndex()
+ : event.getProgressIndex();
+ }
+
public void startAssignTo(final PipeRealtimeDataRegionExtractor extractor) {
matcher.register(extractor);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
similarity index 55%
rename from
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
rename to
iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index 893b832b4cb..da2cde90667 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTimePartitionProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -21,61 +21,44 @@ package
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner;
import org.apache.iotdb.commons.consensus.index.ProgressIndex;
-import org.apache.tsfile.utils.Pair;
-
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
-public class PipeTimePartitionProgressIndexKeeper {
+public class PipeTsFileEpochProgressIndexKeeper {
- // data region id -> (time partition id, <max progress index, is valid>)
- private final Map<String, Map<Long, Pair<ProgressIndex, Boolean>>>
progressIndexKeeper =
+ // data region id -> (tsFile path, max progress index)
+ private final Map<String, Map<String, ProgressIndex>> progressIndexKeeper =
new ConcurrentHashMap<>();
public synchronized void updateProgressIndex(
- final String dataRegionId, final long timePartitionId, final
ProgressIndex progressIndex) {
+ final String dataRegionId, final String tsFileName, final ProgressIndex
progressIndex) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.compute(
- timePartitionId,
- (k, v) -> {
- if (v == null) {
- return new Pair<>(progressIndex, true);
- }
- return new Pair<>(
-
v.getLeft().updateToMinimumEqualOrIsAfterProgressIndex(progressIndex), true);
- });
+ tsFileName,
+ (k, v) ->
+ v == null
+ ? progressIndex
+ :
v.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
}
public synchronized void eliminateProgressIndex(
- final String dataRegionId, final long timePartitionId, final
ProgressIndex progressIndex) {
+ final String dataRegionId, final String filePath) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .compute(
- timePartitionId,
- (k, v) -> {
- if (v == null) {
- return null;
- }
- if (v.getRight() && !v.getLeft().isAfter(progressIndex)) {
- return new Pair<>(v.getLeft(), false);
- }
- return v;
- });
+ .remove(filePath);
}
public synchronized boolean isProgressIndexAfterOrEquals(
- final String dataRegionId, final long timePartitionId, final
ProgressIndex progressIndex) {
+ final String dataRegionId, final String tsFilePath, final ProgressIndex
progressIndex) {
return progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
.entrySet()
.stream()
- .filter(entry -> entry.getKey() != timePartitionId)
+ .filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
.map(Entry::getValue)
- .filter(pair -> pair.right)
- .map(Pair::getLeft)
.filter(Objects::nonNull)
.anyMatch(index -> !index.isAfter(progressIndex));
}
@@ -84,19 +67,19 @@ public class PipeTimePartitionProgressIndexKeeper {
private static class PipeTimePartitionProgressIndexKeeperHolder {
- private static final PipeTimePartitionProgressIndexKeeper INSTANCE =
- new PipeTimePartitionProgressIndexKeeper();
+ private static final PipeTsFileEpochProgressIndexKeeper INSTANCE =
+ new PipeTsFileEpochProgressIndexKeeper();
private PipeTimePartitionProgressIndexKeeperHolder() {
// empty constructor
}
}
- public static PipeTimePartitionProgressIndexKeeper getInstance() {
- return
PipeTimePartitionProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
+ public static PipeTsFileEpochProgressIndexKeeper getInstance() {
+ return
PipeTsFileEpochProgressIndexKeeper.PipeTimePartitionProgressIndexKeeperHolder.INSTANCE;
}
- private PipeTimePartitionProgressIndexKeeper() {
+ private PipeTsFileEpochProgressIndexKeeper() {
// empty constructor
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
index 919ed20483b..c2db4c77c86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/realtime/epoch/TsFileEpoch.java
@@ -34,20 +34,20 @@ public class TsFileEpoch {
dataRegionExtractor2State;
private final AtomicLong insertNodeMinTime;
- public TsFileEpoch(String filePath) {
+ public TsFileEpoch(final String filePath) {
this.filePath = filePath;
this.dataRegionExtractor2State = new ConcurrentHashMap<>();
this.insertNodeMinTime = new AtomicLong(Long.MAX_VALUE);
}
- public TsFileEpoch.State getState(PipeRealtimeDataRegionExtractor extractor)
{
+ public TsFileEpoch.State getState(final PipeRealtimeDataRegionExtractor
extractor) {
return dataRegionExtractor2State
.computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
.get();
}
public void migrateState(
- PipeRealtimeDataRegionExtractor extractor, TsFileEpochStateMigrator
visitor) {
+ final PipeRealtimeDataRegionExtractor extractor, final
TsFileEpochStateMigrator visitor) {
dataRegionExtractor2State
.computeIfAbsent(extractor, o -> new AtomicReference<>(State.EMPTY))
.getAndUpdate(visitor::migrate);
@@ -60,7 +60,7 @@ public class TsFileEpoch {
.setRecentProcessedTsFileEpochState(extractor.getTaskID(),
state.get()));
}
- public void updateInsertNodeMinTime(long newComingMinTime) {
+ public void updateInsertNodeMinTime(final long newComingMinTime) {
insertNodeMinTime.updateAndGet(recordedMinTime ->
Math.min(recordedMinTime, newComingMinTime));
}
@@ -68,6 +68,10 @@ public class TsFileEpoch {
return insertNodeMinTime.get();
}
+ public String getFilePath() {
+ return filePath;
+ }
+
@Override
public String toString() {
return "TsFileEpoch{"
@@ -90,7 +94,7 @@ public class TsFileEpoch {
private final int id;
- State(int id) {
+ State(final int id) {
this.id = id;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 9525b57e75b..87ae50e872f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -28,7 +28,7 @@ import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.exception.load.PartitionViolationException;
-import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTimePartitionProgressIndexKeeper;
+import
org.apache.iotdb.db.pipe.extractor.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.schemaengine.schemaregion.utils.ResourceByPathUtils;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.utils.InsertionCompactionCandidateStatus;
@@ -1200,8 +1200,8 @@ public class TsFileResource {
? progressIndex
:
maxProgressIndex.updateToMinimumEqualOrIsAfterProgressIndex(progressIndex));
- PipeTimePartitionProgressIndexKeeper.getInstance()
- .updateProgressIndex(getDataRegionId(), getTimePartition(),
maxProgressIndex);
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .updateProgressIndex(getDataRegionId(), getTsFilePath(),
maxProgressIndex);
}
public void setProgressIndex(ProgressIndex progressIndex) {
@@ -1211,8 +1211,8 @@ public class TsFileResource {
maxProgressIndex = progressIndex;
- PipeTimePartitionProgressIndexKeeper.getInstance()
- .updateProgressIndex(getDataRegionId(), getTimePartition(),
maxProgressIndex);
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .updateProgressIndex(getDataRegionId(), getTsFilePath(),
maxProgressIndex);
}
public ProgressIndex getMaxProgressIndexAfterClose() throws
IllegalStateException {