This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 9c6b90a9812 [Pipe] Deduplicate historical tsfile events in
IoTConsensusV2 pipes (#17472)
9c6b90a9812 is described below
commit 9c6b90a9812eeb2b95c7acd3faa6df7fc49dd2de
Author: Peng Junzhi <[email protected]>
AuthorDate: Fri Apr 17 10:56:09 2026 +0800
[Pipe] Deduplicate historical tsfile events in IoTConsensusV2 pipes (#17472)
* Pipe: deduplicate historical tsfile events per task scope
* Pipe: address historical tsfile dedup review comments
* spotless
* Pipe: fix dedup scope cleanup and historical skip loop
* spotless
* Refine historical tsfile dedup supply semantics
* spotless
---
.../tsfile/PipeCompactedTsFileInsertionEvent.java | 5 +-
.../common/tsfile/PipeTsFileInsertionEvent.java | 57 ++++---
...istoricalDataRegionTsFileAndDeletionSource.java | 87 +++++++++--
.../PipeRealtimeDataRegionHybridSource.java | 9 +-
.../realtime/PipeRealtimeDataRegionSource.java | 10 +-
.../PipeRealtimeDataRegionTsFileSource.java | 7 +-
.../realtime/assigner/PipeDataRegionAssigner.java | 1 +
.../PipeTsFileEpochProgressIndexKeeper.java | 78 +++++++--
.../pipe/event/PipeTsFileInsertionEventTest.java | 99 ++++++++++++
...ricalDataRegionTsFileAndDeletionSourceTest.java | 174 +++++++++++++++++++++
.../PipeTsFileEpochProgressIndexKeeperTest.java | 117 ++++++++++++++
11 files changed, 585 insertions(+), 59 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index 343c8d89329..95ff0a25373 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -91,6 +91,7 @@ public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent
// init fields of PipeTsFileInsertionEvent
flushPointCount = bindFlushPointCount(originalEvents);
overridingProgressIndex = bindOverridingProgressIndex(originalEvents);
+ bindTsFileDedupScopeID(anyOfOriginalEvents.getTsFileDedupScopeID());
}
private static boolean bindIsWithMod(Set<PipeTsFileInsertionEvent>
originalEvents) {
@@ -184,10 +185,10 @@ public class PipeCompactedTsFileInsertionEvent extends
PipeTsFileInsertionEvent
@Override
public void eliminateProgressIndex() {
- if (Objects.isNull(overridingProgressIndex)) {
+ if (Objects.isNull(overridingProgressIndex) &&
Objects.nonNull(getTsFileDedupScopeID())) {
for (final String originFilePath : originFilePaths) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(dataRegionId, pipeName, originFilePath);
+ .eliminateProgressIndex(dataRegionId, getTsFileDedupScopeID(),
originFilePath);
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 1505e15996f..adddc9d7ce5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -99,6 +99,7 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
protected volatile ProgressIndex overridingProgressIndex;
private Set<String> tableNames;
+ private String tsFileDedupScopeID;
// This is set to check the tsFile paths by privilege
private Map<IDeviceID, String[]> treeSchemaMap;
@@ -398,13 +399,26 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
}
public void eliminateProgressIndex() {
- if (Objects.isNull(overridingProgressIndex) && Objects.nonNull(resource)) {
+ if (Objects.isNull(overridingProgressIndex)
+ && Objects.nonNull(resource)
+ && Objects.nonNull(tsFileDedupScopeID)) {
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(
- Integer.parseInt(resource.getDataRegionId()), pipeName,
resource.getTsFilePath());
+ Integer.parseInt(resource.getDataRegionId()),
+ tsFileDedupScopeID,
+ resource.getTsFilePath());
}
}
+ public PipeTsFileInsertionEvent bindTsFileDedupScopeID(final String
tsFileDedupScopeID) {
+ this.tsFileDedupScopeID = tsFileDedupScopeID;
+ return this;
+ }
+
+ public String getTsFileDedupScopeID() {
+ return tsFileDedupScopeID;
+ }
+
@Override
public PipeTsFileInsertionEvent
shallowCopySelfAndBindPipeTaskMetaForProgressReport(
final String pipeName,
@@ -419,25 +433,26 @@ public class PipeTsFileInsertionEvent extends
PipeInsertionEvent
final long startTime,
final long endTime) {
return new PipeTsFileInsertionEvent(
- getRawIsTableModelEvent(),
- getSourceDatabaseNameFromDataRegion(),
- resource,
- tsFile,
- isWithMod,
- isLoaded,
- isGeneratedByHistoricalExtractor,
- tableNames,
- pipeName,
- creationTime,
- pipeTaskMeta,
- treePattern,
- tablePattern,
- userId,
- userName,
- cliHostname,
- skipIfNoPrivileges,
- startTime,
- endTime);
+ getRawIsTableModelEvent(),
+ getSourceDatabaseNameFromDataRegion(),
+ resource,
+ tsFile,
+ isWithMod,
+ isLoaded,
+ isGeneratedByHistoricalExtractor,
+ tableNames,
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ treePattern,
+ tablePattern,
+ userId,
+ userName,
+ cliHostname,
+ skipIfNoPrivileges,
+ startTime,
+ endTime)
+ .bindTsFileDedupScopeID(tsFileDedupScopeID);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 66f8d48ce28..2ca283b1395 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -50,6 +50,7 @@ import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import
org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor;
import org.apache.iotdb.db.pipe.resource.PipeDataNodeResourceManager;
import org.apache.iotdb.db.pipe.source.dataregion.DataRegionListeningFilter;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.storageengine.StorageEngine;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
@@ -124,6 +125,7 @@ public class PipeHistoricalDataRegionTsFileAndDeletionSource
private String pipeName;
private long creationTime;
+ private String tsFileDedupScopeID;
private PipeTaskMeta pipeTaskMeta;
private ProgressIndex startIndex;
@@ -320,6 +322,14 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
}
dataRegionId = environment.getRegionId();
+ tsFileDedupScopeID =
+ pipeName
+ + "_"
+ + dataRegionId
+ + "_"
+ + creationTime
+ + "_"
+ + Integer.toHexString(System.identityHashCode(environment));
treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
tablePattern =
TablePattern.parsePipePatternFromSourceParameters(parameters);
@@ -807,11 +817,15 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
final PersistentResource resource = pendingQueue.poll();
if (resource == null) {
return supplyTerminateEvent();
- } else if (resource instanceof TsFileResource) {
- return supplyTsFileEvent((TsFileResource) resource);
- } else {
- return supplyDeletionEvent((DeletionResource) resource);
}
+
+ if (resource instanceof TsFileResource) {
+ final TsFileResource tsFileResource = (TsFileResource) resource;
+ return consumeSkippedHistoricalTsFileEventIfNecessary(tsFileResource)
+ ? supplyProgressReportEvent(tsFileResource.getMaxProgressIndex())
+ : supplyTsFileEvent(tsFileResource);
+ }
+ return supplyDeletionEvent((DeletionResource) resource);
}
private Event supplyTerminateEvent() {
@@ -834,20 +848,54 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
return terminateEvent;
}
- private Event supplyTsFileEvent(final TsFileResource resource) {
- if (!filteredTsFileResources2TableNames.containsKey(resource)) {
- final ProgressReportEvent progressReportEvent =
- new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
- progressReportEvent.bindProgressIndex(resource.getMaxProgressIndex());
- final boolean isReferenceCountIncreased =
- progressReportEvent.increaseReferenceCount(
- PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
- if (!isReferenceCountIncreased) {
+ protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(final
TsFileResource resource) {
+ if (!filteredTsFileResources2TableNames.containsKey(resource)
+ || !shouldSkipHistoricalTsFileEvent(resource)) {
+ return false;
+ }
+
+ filteredTsFileResources2TableNames.remove(resource);
+ LOGGER.info(
+ "Pipe {}@{}: skip historical tsfile {} because realtime source in
current task {} has already captured it.",
+ pipeName,
+ dataRegionId,
+ resource.getTsFilePath(),
+ tsFileDedupScopeID);
+ try {
+ return true;
+ } finally {
+ try {
+ PipeDataNodeResourceManager.tsfile()
+ .unpinTsFileResource(resource, shouldTransferModFile, pipeName);
+ } catch (final IOException e) {
LOGGER.warn(
- "The reference count of the event {} cannot be increased, skipping
it.",
- progressReportEvent);
+ "Pipe {}@{}: failed to unpin skipped historical TsFileResource,
original path: {}",
+ pipeName,
+ dataRegionId,
+ resource.getTsFilePath(),
+ e);
}
- return isReferenceCountIncreased ? progressReportEvent : null;
+ }
+ }
+
+ protected Event supplyProgressReportEvent(final ProgressIndex progressIndex)
{
+ final ProgressReportEvent progressReportEvent =
+ new ProgressReportEvent(pipeName, creationTime, pipeTaskMeta);
+ progressReportEvent.bindProgressIndex(progressIndex);
+ final boolean isReferenceCountIncreased =
+ progressReportEvent.increaseReferenceCount(
+ PipeHistoricalDataRegionTsFileAndDeletionSource.class.getName());
+ if (!isReferenceCountIncreased) {
+ LOGGER.warn(
+ "The reference count of the event {} cannot be increased, skipping
it.",
+ progressReportEvent);
+ }
+ return isReferenceCountIncreased ? progressReportEvent : null;
+ }
+
+ protected Event supplyTsFileEvent(final TsFileResource resource) {
+ if (!filteredTsFileResources2TableNames.containsKey(resource)) {
+ return supplyProgressReportEvent(resource.getMaxProgressIndex());
}
final PipeTsFileInsertionEvent event =
@@ -916,6 +964,13 @@ public class
PipeHistoricalDataRegionTsFileAndDeletionSource
}
}
+ private boolean shouldSkipHistoricalTsFileEvent(final TsFileResource
resource) {
+ return pipeName.startsWith(PipeStaticMeta.CONSENSUS_PIPE_PREFIX)
+ && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+ && PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .containsTsFile(dataRegionId, tsFileDedupScopeID,
resource.getTsFilePath());
+ }
+
private Event supplyDeletionEvent(final DeletionResource deletionResource) {
final PipeDeleteDataNodeEvent event =
new PipeDeleteDataNodeEvent(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
index c9e3f35288a..97b6d54fde5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionHybridSource.java
@@ -83,7 +83,8 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
if (canNotUseTabletAnymore(event)) {
event.getTsFileEpoch().migrateState(this, curState ->
TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexKeeper.getInstance()
- .registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
+ .registerProgressIndex(
+ dataRegionId, getTsFileDedupScopeID(),
event.getTsFileEpoch().getResource());
} else {
event
.getTsFileEpoch()
@@ -156,7 +157,8 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
case USING_TABLET:
// If the state is USING_TABLET, discard the event
PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
+ .eliminateProgressIndex(
+ dataRegionId, getTsFileDedupScopeID(),
event.getTsFileEpoch().getFilePath());
event.decreaseReferenceCount(PipeRealtimeDataRegionHybridSource.class.getName(),
false);
return;
case EMPTY:
@@ -283,7 +285,8 @@ public class PipeRealtimeDataRegionHybridSource extends
PipeRealtimeDataRegionSo
PipeDataNodeAgent.runtime()
.report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
PipeTsFileEpochProgressIndexKeeper.getInstance()
- .eliminateProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getFilePath());
+ .eliminateProgressIndex(
+ dataRegionId, getTsFileDedupScopeID(),
event.getTsFileEpoch().getFilePath());
return null;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
index 37f1f0a74cb..2ab2061ce7e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionSource.java
@@ -133,6 +133,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
protected String pipeID;
private String taskID;
+ private String tsFileDedupScopeID;
protected long userId;
protected String userName;
protected String cliHostname;
@@ -226,6 +227,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
creationTime = environment.getCreationTime();
pipeID = pipeName + "_" + creationTime;
taskID = pipeName + "_" + dataRegionId + "_" + creationTime;
+ tsFileDedupScopeID = taskID + "_" +
Integer.toHexString(System.identityHashCode(environment));
treePattern = TreePattern.parsePipePatternFromSourceParameters(parameters);
tablePattern =
TablePattern.parsePipePatternFromSourceParameters(parameters);
@@ -322,6 +324,8 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
if (dataRegionId >= 0) {
PipeInsertionDataNodeListener.getInstance().stopListenAndAssign(dataRegionId,
this);
PipeTimePartitionListener.getInstance().stopListen(dataRegionId, this);
+ PipeTsFileEpochProgressIndexKeeper.getInstance()
+ .clearProgressIndex(dataRegionId, tsFileDedupScopeID);
}
synchronized (isClosed) {
@@ -580,7 +584,7 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
if (PipeTsFileEpochProgressIndexKeeper.getInstance()
.isProgressIndexAfterOrEquals(
dataRegionId,
- pipeName,
+ tsFileDedupScopeID,
event.getTsFileEpoch().getFilePath(),
getProgressIndex4RealtimeEvent(event))) {
event.skipReportOnCommit();
@@ -652,6 +656,10 @@ public abstract class PipeRealtimeDataRegionSource
implements PipeExtractor {
return taskID;
}
+ public final String getTsFileDedupScopeID() {
+ return tsFileDedupScopeID;
+ }
+
public void increaseExtractEpochSize() {
extractEpochSize.incrementAndGet();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
index 98bfb30391a..97c3138de7c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/PipeRealtimeDataRegionTsFileSource.java
@@ -52,7 +52,8 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
event.getTsFileEpoch().migrateState(this, state ->
TsFileEpoch.State.USING_TSFILE);
PipeTsFileEpochProgressIndexKeeper.getInstance()
- .registerProgressIndex(dataRegionId, pipeName,
event.getTsFileEpoch().getResource());
+ .registerProgressIndex(
+ dataRegionId, getTsFileDedupScopeID(),
event.getTsFileEpoch().getResource());
if (!(event.getEvent() instanceof TsFileInsertionEvent)) {
event.decreaseReferenceCount(PipeRealtimeDataRegionTsFileSource.class.getName(),
false);
@@ -104,7 +105,9 @@ public class PipeRealtimeDataRegionTsFileSource extends
PipeRealtimeDataRegionSo
.report(pipeTaskMeta, new
PipeRuntimeNonCriticalException(errorMessage));
PipeTsFileEpochProgressIndexKeeper.getInstance()
.eliminateProgressIndex(
- dataRegionId, pipeName,
realtimeEvent.getTsFileEpoch().getFilePath());
+ dataRegionId,
+ getTsFileDedupScopeID(),
+ realtimeEvent.getTsFileEpoch().getFilePath());
}
realtimeEvent.decreaseReferenceCount(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 9c7182f051c..bdeebde8938 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -190,6 +190,7 @@ public class PipeDataRegionAssigner implements Closeable {
if (innerEvent instanceof PipeTsFileInsertionEvent) {
final PipeTsFileInsertionEvent tsFileInsertionEvent =
(PipeTsFileInsertionEvent) innerEvent;
+
tsFileInsertionEvent.bindTsFileDedupScopeID(source.getTsFileDedupScopeID());
tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
index bf15dcdc547..aaf03f570e2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeper.java
@@ -31,36 +31,86 @@ import java.util.concurrent.ConcurrentHashMap;
public class PipeTsFileEpochProgressIndexKeeper {
- // data region id -> pipeName -> tsFile path -> max progress index
+ // data region id -> task scope id -> tsFile path -> max progress index
private final Map<Integer, Map<String, Map<String, TsFileResource>>>
progressIndexKeeper =
new ConcurrentHashMap<>();
public synchronized void registerProgressIndex(
- final int dataRegionId, final String pipeName, final TsFileResource
resource) {
+ final int dataRegionId, final String taskScopeID, final TsFileResource
resource) {
progressIndexKeeper
.computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(taskScopeID, k -> new ConcurrentHashMap<>())
.putIfAbsent(resource.getTsFilePath(), resource);
}
public synchronized void eliminateProgressIndex(
- final int dataRegionId, final @Nonnull String pipeName, final String
filePath) {
- progressIndexKeeper
- .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
- .remove(filePath);
+ final int dataRegionId, final @Nonnull String taskScopeID, final String
filePath) {
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return;
+ }
+
+ final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+ scopeProgressIndexKeeper.get(taskScopeID);
+ if (tsFileProgressIndexKeeper == null) {
+ return;
+ }
+
+ tsFileProgressIndexKeeper.remove(filePath);
+ if (tsFileProgressIndexKeeper.isEmpty()) {
+ scopeProgressIndexKeeper.remove(taskScopeID);
+ if (scopeProgressIndexKeeper.isEmpty()) {
+ progressIndexKeeper.remove(dataRegionId);
+ }
+ }
+ }
+
+ public synchronized void clearProgressIndex(
+ final int dataRegionId, final @Nonnull String taskScopeID) {
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return;
+ }
+
+ scopeProgressIndexKeeper.remove(taskScopeID);
+ if (scopeProgressIndexKeeper.isEmpty()) {
+ progressIndexKeeper.remove(dataRegionId);
+ }
+ }
+
+ public synchronized boolean containsTsFile(
+ final int dataRegionId, final @Nonnull String taskScopeID, final String
tsFilePath) {
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return false;
+ }
+
+ final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+ scopeProgressIndexKeeper.get(taskScopeID);
+ return tsFileProgressIndexKeeper != null &&
tsFileProgressIndexKeeper.containsKey(tsFilePath);
}
public synchronized boolean isProgressIndexAfterOrEquals(
final int dataRegionId,
- final String pipeName,
+ final String taskScopeID,
final String tsFilePath,
final ProgressIndex progressIndex) {
- return progressIndexKeeper
- .computeIfAbsent(dataRegionId, k -> new ConcurrentHashMap<>())
- .computeIfAbsent(pipeName, k -> new ConcurrentHashMap<>())
- .entrySet()
- .stream()
+ final Map<String, Map<String, TsFileResource>> scopeProgressIndexKeeper =
+ progressIndexKeeper.get(dataRegionId);
+ if (scopeProgressIndexKeeper == null) {
+ return false;
+ }
+
+ final Map<String, TsFileResource> tsFileProgressIndexKeeper =
+ scopeProgressIndexKeeper.get(taskScopeID);
+ if (tsFileProgressIndexKeeper == null) {
+ return false;
+ }
+
+ return tsFileProgressIndexKeeper.entrySet().stream()
.filter(entry -> !Objects.equals(entry.getKey(), tsFilePath))
.map(Entry::getValue)
.filter(Objects::nonNull)
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
index 5ba0843bf80..db5452e0b92 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
@@ -23,13 +23,17 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.audit.IAuditEntity;
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.conf.IoTDBConstant;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.agent.task.progress.CommitterKey;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import
org.apache.iotdb.db.pipe.event.common.tsfile.PipeCompactedTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
+import
org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
import
org.apache.iotdb.db.queryengine.plan.relational.security.TreeAccessCheckContext;
@@ -50,8 +54,11 @@ import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.Mockito;
import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
@@ -157,6 +164,98 @@ public class PipeTsFileInsertionEventTest {
}
}
+ @Test
+ public void testTsFileDedupScopeIdIsPreservedForCleanupAndCopy() throws
Exception {
+ final PipeTsFileEpochProgressIndexKeeper keeper =
+ PipeTsFileEpochProgressIndexKeeper.getInstance();
+ final int dataRegionId = 1;
+ final String scopeA = "scope-a";
+ final String scopeB = "scope-b";
+ final File tempDir =
Files.createTempDirectory("pipeTsFileDedupScope").toFile();
+
+ try {
+ final TsFileResource sourceResource =
+ createSpyTsFileResource(tempDir, "source.tsfile", 1L, dataRegionId);
+ keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource);
+ keeper.registerProgressIndex(dataRegionId, scopeB, sourceResource);
+
+ final PipeTsFileInsertionEvent sourceEvent =
+ new PipeTsFileInsertionEvent(
+ true,
+ "db",
+ sourceResource,
+ null,
+ true,
+ false,
+ false,
+ Collections.singleton("table"),
+ "pipe",
+ 1L,
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE)
+ .bindTsFileDedupScopeID(scopeA);
+
+ sourceEvent.eliminateProgressIndex();
+ Assert.assertFalse(
+ keeper.containsTsFile(dataRegionId, scopeA,
sourceResource.getTsFilePath()));
+ Assert.assertTrue(
+ keeper.containsTsFile(dataRegionId, scopeB,
sourceResource.getTsFilePath()));
+
+ keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource);
+ final PipeTsFileInsertionEvent copiedEvent =
+ sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ "pipe", 2L, null, null, null, null, null, null, true,
Long.MIN_VALUE, Long.MAX_VALUE);
+ Assert.assertEquals(scopeA, copiedEvent.getTsFileDedupScopeID());
+ copiedEvent.eliminateProgressIndex();
+ Assert.assertFalse(
+ keeper.containsTsFile(dataRegionId, scopeA,
sourceResource.getTsFilePath()));
+ Assert.assertTrue(
+ keeper.containsTsFile(dataRegionId, scopeB,
sourceResource.getTsFilePath()));
+
+ keeper.registerProgressIndex(dataRegionId, scopeA, sourceResource);
+ final TsFileResource compactedResource =
+ createSpyTsFileResource(tempDir, "compacted.tsfile", 2L,
dataRegionId);
+ final PipeCompactedTsFileInsertionEvent compactedEvent =
+ new PipeCompactedTsFileInsertionEvent(
+ new CommitterKey("pipe", 1L, dataRegionId, 0),
+ Collections.singleton(sourceEvent),
+ sourceEvent,
+ compactedResource,
+ true);
+ Assert.assertEquals(scopeA, compactedEvent.getTsFileDedupScopeID());
+ compactedEvent.eliminateProgressIndex();
+ Assert.assertFalse(
+ keeper.containsTsFile(dataRegionId, scopeA,
sourceResource.getTsFilePath()));
+ Assert.assertTrue(
+ keeper.containsTsFile(dataRegionId, scopeB,
sourceResource.getTsFilePath()));
+ } finally {
+ keeper.clearProgressIndex(dataRegionId, scopeA);
+ keeper.clearProgressIndex(dataRegionId, scopeB);
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
+ private TsFileResource createSpyTsFileResource(
+ final File tempDir, final String fileName, final long flushOrderId,
final int dataRegionId)
+ throws IOException {
+ final File file = new File(tempDir, fileName);
+ Assert.assertTrue(file.createNewFile());
+
+ final TsFileResource resource = new TsFileResource(file);
+ resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+
+ final TsFileResource spyResource = Mockito.spy(resource);
+
Mockito.doReturn(String.valueOf(dataRegionId)).when(spyResource).getDataRegionId();
+ return spyResource;
+ }
+
static class TestAccessControl implements AccessControl {
@Override
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
new file mode 100644
index 00000000000..14f97ef79d6
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSourceTest.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.historical;
+
+import org.apache.iotdb.commons.pipe.datastructure.resource.PersistentResource;
+import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.pipe.api.event.Event;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.nio.file.Files;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class PipeHistoricalDataRegionTsFileAndDeletionSourceTest {
+
+ @Test
+ public void
testSupplyReturnsProgressReportEventAfterSkippingDuplicateHistoricalTsFile()
+ throws Exception {
+ final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+ new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+ final Event expectedEvent = new Event() {};
+ final File tempDir =
Files.createTempDirectory("pipeHistoricalSkipDuplicate").toFile();
+
+ try {
+ final TsFileResource skippedResource = createTsFileResource(tempDir,
"skip.tsfile");
+ final TsFileResource nextResource = createTsFileResource(tempDir,
"next.tsfile");
+
+ source.setSkippedTsFilePaths(skippedResource.getTsFilePath());
+ source.setSuppliedEvent(expectedEvent);
+ setPrivateField(source, "hasBeenStarted", true);
+ setPrivateField(
+ source,
+ "pendingQueue",
+ new ArrayDeque<PersistentResource>(Arrays.asList(skippedResource,
nextResource)));
+
+ Assert.assertTrue(source.supply() instanceof ProgressReportEvent);
+ Assert.assertEquals(
+ Arrays.asList(skippedResource.getTsFilePath()),
source.getConsumedSkippedTsFilePaths());
+ Assert.assertTrue(source.getSuppliedTsFiles().isEmpty());
+ Assert.assertEquals(1, source.getPendingQueueSize());
+
+ Assert.assertSame(expectedEvent, source.supply());
+ Assert.assertEquals(Arrays.asList(nextResource.getTsFilePath()),
source.getSuppliedTsFiles());
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
+ @Test
+ public void testSupplyDoesNotSwallowNonSkippedNullTsFileEvent() throws
Exception {
+ final TestablePipeHistoricalDataRegionTsFileAndDeletionSource source =
+ new TestablePipeHistoricalDataRegionTsFileAndDeletionSource();
+ final File tempDir =
Files.createTempDirectory("pipeHistoricalNullSemantics").toFile();
+
+ try {
+ final TsFileResource firstResource = createTsFileResource(tempDir,
"first.tsfile");
+ final TsFileResource secondResource = createTsFileResource(tempDir,
"second.tsfile");
+
+ source.setSuppliedEvent(null);
+ setPrivateField(source, "hasBeenStarted", true);
+ setPrivateField(
+ source,
+ "pendingQueue",
+ new ArrayDeque<PersistentResource>(Arrays.asList(firstResource,
secondResource)));
+
+ Assert.assertNull(source.supply());
+ Assert.assertEquals(
+ Arrays.asList(firstResource.getTsFilePath()),
source.getSuppliedTsFiles());
+ Assert.assertEquals(1, source.getPendingQueueSize());
+ } finally {
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+ }
+
+ private static TsFileResource createTsFileResource(final File tempDir, final
String fileName)
+ throws IOException {
+ final File file = new File(tempDir, fileName);
+ Assert.assertTrue(file.createNewFile());
+ return new TsFileResource(file);
+ }
+
+ private static void setPrivateField(
+ final PipeHistoricalDataRegionTsFileAndDeletionSource source,
+ final String fieldName,
+ final Object value)
+ throws ReflectiveOperationException {
+ final Field field =
+
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(source, value);
+ }
+
+ private static class TestablePipeHistoricalDataRegionTsFileAndDeletionSource
+ extends PipeHistoricalDataRegionTsFileAndDeletionSource {
+
+ private final Set<String> skippedTsFilePaths = new HashSet<>();
+ private final List<String> consumedSkippedTsFilePaths = new ArrayList<>();
+ private final List<String> suppliedTsFiles = new ArrayList<>();
+ private Event suppliedEvent;
+
+ private void setSkippedTsFilePaths(final String... skippedTsFilePaths) {
+ this.skippedTsFilePaths.clear();
+ this.skippedTsFilePaths.addAll(Arrays.asList(skippedTsFilePaths));
+ }
+
+ private List<String> getConsumedSkippedTsFilePaths() {
+ return consumedSkippedTsFilePaths;
+ }
+
+ private List<String> getSuppliedTsFiles() {
+ return suppliedTsFiles;
+ }
+
+ @Override
+ public int getPendingQueueSize() {
+ try {
+ final Field field =
+
PipeHistoricalDataRegionTsFileAndDeletionSource.class.getDeclaredField("pendingQueue");
+ field.setAccessible(true);
+ return ((ArrayDeque<?>) field.get(this)).size();
+ } catch (final ReflectiveOperationException e) {
+ throw new AssertionError(e);
+ }
+ }
+
+ private void setSuppliedEvent(final Event suppliedEvent) {
+ this.suppliedEvent = suppliedEvent;
+ }
+
+ @Override
+ protected boolean consumeSkippedHistoricalTsFileEventIfNecessary(
+ final TsFileResource resource) {
+ if (!skippedTsFilePaths.contains(resource.getTsFilePath())) {
+ return false;
+ }
+ consumedSkippedTsFilePaths.add(resource.getTsFilePath());
+ return true;
+ }
+
+ @Override
+ protected Event supplyTsFileEvent(final TsFileResource resource) {
+ suppliedTsFiles.add(resource.getTsFilePath());
+ return suppliedEvent;
+ }
+ }
+}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
new file mode 100644
index 00000000000..4d27fff5d57
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeTsFileEpochProgressIndexKeeperTest.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner;
+
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+
+public class PipeTsFileEpochProgressIndexKeeperTest {
+
+ private static final int DATA_REGION_ID = 1;
+ private static final String TASK_SCOPE_A = "task-scope-a";
+ private static final String TASK_SCOPE_B = "task-scope-b";
+
+ private final PipeTsFileEpochProgressIndexKeeper keeper =
+ PipeTsFileEpochProgressIndexKeeper.getInstance();
+
+ private File tempDir;
+
+ @Before
+ public void setUp() throws IOException {
+ tempDir =
Files.createTempDirectory("pipeTsFileEpochProgressIndexKeeper").toFile();
+ }
+
+ @After
+ public void tearDown() {
+ keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+ keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_B);
+ FileUtils.deleteFileOrDirectory(tempDir);
+ }
+
+ @Test
+ public void testDuplicateTsFileLookupIsScopedByTaskInstance() throws
IOException {
+ final TsFileResource resource = createTsFileResource("shared.tsfile", 1L);
+
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, resource);
+
+ Assert.assertTrue(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A,
resource.getTsFilePath()));
+ Assert.assertFalse(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B,
resource.getTsFilePath()));
+ }
+
+ @Test
+ public void testProgressIndexCheckDoesNotLeakAcrossTaskScopes() throws
IOException {
+ keeper.registerProgressIndex(
+ DATA_REGION_ID, TASK_SCOPE_A, createTsFileResource("1-1-0-0.tsfile",
1L));
+
+ final TsFileResource comparedResource =
createTsFileResource("1-2-0-0.tsfile", 2L);
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A,
comparedResource);
+
+ Assert.assertTrue(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID,
+ TASK_SCOPE_A,
+ comparedResource.getTsFilePath(),
+ new SimpleProgressIndex(1, 2L)));
+ Assert.assertFalse(
+ keeper.isProgressIndexAfterOrEquals(
+ DATA_REGION_ID,
+ TASK_SCOPE_B,
+ comparedResource.getTsFilePath(),
+ new SimpleProgressIndex(1, 2L)));
+ }
+
+ @Test
+ public void testClearProgressIndexOnlyRemovesTargetTaskScope() throws
IOException {
+ final TsFileResource scopeAResource =
createTsFileResource("scope-a.tsfile", 1L);
+ final TsFileResource scopeBResource =
createTsFileResource("scope-b.tsfile", 1L);
+
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_A, scopeAResource);
+ keeper.registerProgressIndex(DATA_REGION_ID, TASK_SCOPE_B, scopeBResource);
+
+ keeper.clearProgressIndex(DATA_REGION_ID, TASK_SCOPE_A);
+
+ Assert.assertFalse(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_A,
scopeAResource.getTsFilePath()));
+ Assert.assertTrue(
+ keeper.containsTsFile(DATA_REGION_ID, TASK_SCOPE_B,
scopeBResource.getTsFilePath()));
+ }
+
+ private TsFileResource createTsFileResource(final String fileName, final
long flushOrderId)
+ throws IOException {
+ final File file = new File(tempDir, fileName);
+ Assert.assertTrue(file.createNewFile());
+
+ final TsFileResource resource = new TsFileResource(file);
+ resource.updateProgressIndex(new SimpleProgressIndex(1, flushOrderId));
+ return resource;
+ }
+}