This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new ae37192cee3 [To dev/1.3] Pipe: Avoided timeIndexes' memory occupation
for deleted tsFiles && Removed useless stale logic && Improved the memory
calculation of tsFileResource && Avoided the potential failure for permission
check when the source file is deleted (#16399) (#16415)
ae37192cee3 is described below
commit ae37192cee32548494789db0ad00eb08c41d4ed3
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 15 14:09:14 2025 +0800
[To dev/1.3] Pipe: Avoided timeIndexes' memory occupation for deleted
tsFiles && Removed useless stale logic && Improved the memory calculation of
tsFileResource && Avoided the potential failure for permission check when the
source file is deleted (#16399) (#16415)
* Pipe: Avoided timeIndexes' memory occupation for deleted tsFiles &&
Removed useless stale logic && Improved the memory calculation of
tsFileResource && Avoided the potential failure for permission check when the
source file is deleted (#16399)
* fix
* partial
* reduce unnecessary
* Fix
* may fix
* Fix
* bishop
* fix
* nonnull
* fix
* enrich
* fix
* fix
* comp
* fix
* fix
* Refactor
* fix
---
.../agent/task/connection/PipeEventCollector.java | 7 +-----
.../common/tsfile/PipeTsFileInsertionEvent.java | 4 +++-
.../PipeHistoricalDataRegionTsFileSource.java | 10 ++++-----
.../dataregion/tsfile/TsFileResource.java | 25 ++++++++++++++++------
.../compaction/AbstractCompactionTest.java | 2 ++
5 files changed, 30 insertions(+), 18 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 9edb53ed932..bf9e658f565 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -129,12 +129,7 @@ public class PipeEventCollector implements EventCollector {
return;
}
- if (skipParsing) {
- collectEvent(sourceEvent);
- return;
- }
-
- if (!forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
+ if (skipParsing || !forceTabletFormat &&
canSkipParsing4TsFileEvent(sourceEvent)) {
collectEvent(sourceEvent);
return;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 311190f0181..1e63a2d4af3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -86,6 +86,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
protected long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
protected volatile ProgressIndex overridingProgressIndex;
+ private Set<String> tableNames;
public PipeTsFileInsertionEvent(final TsFileResource resource, final boolean
isLoaded) {
// The modFile must be copied before the event is assigned to the
listening pipes
@@ -122,6 +123,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
this.isGeneratedByPipe = resource.isGeneratedByPipe();
this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
+ this.tableNames = tableNames;
this.dataContainer = new AtomicReference<>(null);
@@ -394,7 +396,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
deviceID ->
pipePattern.mayOverlapWithDevice(((PlainDeviceID)
deviceID).toStringID()));
} catch (final Exception e) {
- LOGGER.warn(
+ LOGGER.info(
"Pipe {}: failed to get devices from TsFile {}, extract it anyway",
pipeName,
resource.getTsFilePath(),
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index e3278977647..0ed563d52b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -111,7 +111,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
private PipePattern pipePattern;
private boolean isDbNameCoveredByPattern = false;
- private boolean isHistoricalExtractorEnabled = false;
+ private boolean isHistoricalSourceEnabled = false;
private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event
time
private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
@@ -174,7 +174,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
EXTRACTOR_START_TIME_KEY,
SOURCE_END_TIME_KEY,
EXTRACTOR_END_TIME_KEY)) {
- isHistoricalExtractorEnabled = true;
+ isHistoricalSourceEnabled = true;
try {
historicalDataExtractionStartTime =
@@ -214,7 +214,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
// enabled, the pipe will lose some historical data.
// 2. User may set the EXTRACTOR_HISTORY_START_TIME and
EXTRACTOR_HISTORY_END_TIME without
// enabling the historical data extraction, which may affect the realtime
data extraction.
- isHistoricalExtractorEnabled =
+ isHistoricalSourceEnabled =
parameters.getBooleanOrDefault(
SystemConstant.RESTART_KEY,
SystemConstant.RESTART_DEFAULT_VALUE)
|| parameters.getBooleanOrDefault(
@@ -382,7 +382,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
.peek(originalResourceList::add)
.filter(
resource ->
- isHistoricalExtractorEnabled
+ isHistoricalSourceEnabled
&&
// Some resource is marked as deleted but not
removed from the list.
!resource.isDeleted()
@@ -408,7 +408,7 @@ public class PipeHistoricalDataRegionTsFileSource
implements PipeHistoricalDataR
.peek(originalResourceList::add)
.filter(
resource ->
- isHistoricalExtractorEnabled
+ isHistoricalSourceEnabled
&&
// Some resource is marked as deleted but not
removed from the list.
!resource.isDeleted()
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index c43b2265b17..1b4b32b1283 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -107,7 +107,9 @@ public class TsFileResource {
protected TsFileResource next;
/** time index */
- private ITimeIndex timeIndex;
+ private volatile ITimeIndex timeIndex;
+
+ private final AtomicReference<Boolean> isEmpty = new AtomicReference<>();
@SuppressWarnings("squid:S3077")
private volatile ModificationFile modFile;
@@ -121,11 +123,11 @@ public class TsFileResource {
/** used for check whether this file has internal unsorted data in
compaction selection */
private TsFileRepairStatus tsFileRepairStatus = TsFileRepairStatus.NORMAL;
- private TsFileLock tsFileLock = new TsFileLock();
+ private final TsFileLock tsFileLock = new TsFileLock();
private boolean isSeq;
- private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+ private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
private DataRegion.SettleTsFileCallBack settleTsFileCallBack;
@@ -647,6 +649,10 @@ public class TsFileResource {
*/
public boolean remove() {
forceMarkDeleted();
+ // To release the memory occupied by pipe if held by it
+ // Note that pipe can safely handle the case that the time index does not
exist
+ isEmpty();
+ degradeTimeIndex();
try {
fsFactory.deleteIfExists(file);
fsFactory.deleteIfExists(
@@ -935,13 +941,18 @@ public class TsFileResource {
* @return resource map size
*/
public long calculateRamSize() {
+ final ProgressIndex progressIndex = maxProgressIndex.get();
if (timeIndex.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) {
- return INSTANCE_SIZE + timeIndex.calculateRamSize();
+ return INSTANCE_SIZE
+ + timeIndex.calculateRamSize()
+ + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() :
0);
}
if (deviceTimeIndexRamSize == 0) {
deviceTimeIndexRamSize = timeIndex.calculateRamSize();
}
- return INSTANCE_SIZE + deviceTimeIndexRamSize;
+ return INSTANCE_SIZE
+ + deviceTimeIndexRamSize
+ + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
}
// used for compaction
@@ -1219,7 +1230,9 @@ public class TsFileResource {
}
public boolean isEmpty() {
- return getFileStartTime() == Long.MAX_VALUE && getFileEndTime() ==
Long.MIN_VALUE;
+ isEmpty.compareAndSet(
+ null, getFileStartTime() == Long.MAX_VALUE && getFileEndTime() ==
Long.MIN_VALUE);
+ return isEmpty.get();
}
public String getDatabaseName() {
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 2146808222c..d5094fa6688 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -44,6 +44,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
+import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
import org.apache.iotdb.db.utils.EnvironmentUtils;
import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -504,6 +505,7 @@ public class AbstractCompactionTest {
}
registeredTimePartitionDirs.clear();
tsFileManager.clear();
+ TsFileResourceManager.getInstance().clear();
}
private void removeFiles() throws IOException {