This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new ae37192cee3 [To dev/1.3] Pipe: Avoided timeIndexes' memory occupation 
for deleted tsFiles && Removed useless stale logic && Improved the memory 
calculation of tsFileResource && Avoided the potential failure for permission 
check when the source file is deleted (#16399) (#16415)
ae37192cee3 is described below

commit ae37192cee32548494789db0ad00eb08c41d4ed3
Author: Caideyipi <[email protected]>
AuthorDate: Mon Sep 15 14:09:14 2025 +0800

    [To dev/1.3] Pipe: Avoided timeIndexes' memory occupation for deleted 
tsFiles && Removed useless stale logic && Improved the memory calculation of 
tsFileResource && Avoided the potential failure for permission check when the 
source file is deleted (#16399) (#16415)
    
    * Pipe: Avoided timeIndexes' memory occupation for deleted tsFiles && 
Removed useless stale logic && Improved the memory calculation of 
tsFileResource && Avoided the potential failure for permission check when the 
source file is deleted (#16399)
    
    * fix
    
    * partial
    
    * reduce unnecessary
    
    * Fix
    
    * may fix
    
    * Fix
    
    * bishop
    
    * fix
    
    * nonnull
    
    * fix
    
    * enrich
    
    * fix
    
    * fix
    
    * comp
    
    * fix
    
    * fix
    
    * Refactor
    
    * fix
---
 .../agent/task/connection/PipeEventCollector.java  |  7 +-----
 .../common/tsfile/PipeTsFileInsertionEvent.java    |  4 +++-
 .../PipeHistoricalDataRegionTsFileSource.java      | 10 ++++-----
 .../dataregion/tsfile/TsFileResource.java          | 25 ++++++++++++++++------
 .../compaction/AbstractCompactionTest.java         |  2 ++
 5 files changed, 30 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
index 9edb53ed932..bf9e658f565 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/task/connection/PipeEventCollector.java
@@ -129,12 +129,7 @@ public class PipeEventCollector implements EventCollector {
       return;
     }
 
-    if (skipParsing) {
-      collectEvent(sourceEvent);
-      return;
-    }
-
-    if (!forceTabletFormat && canSkipParsing4TsFileEvent(sourceEvent)) {
+    if (skipParsing || !forceTabletFormat && 
canSkipParsing4TsFileEvent(sourceEvent)) {
       collectEvent(sourceEvent);
       return;
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 311190f0181..1e63a2d4af3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -86,6 +86,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
   protected long flushPointCount = TsFileProcessor.FLUSH_POINT_COUNT_NOT_SET;
 
   protected volatile ProgressIndex overridingProgressIndex;
+  private Set<String> tableNames;
 
   public PipeTsFileInsertionEvent(final TsFileResource resource, final boolean 
isLoaded) {
     // The modFile must be copied before the event is assigned to the 
listening pipes
@@ -122,6 +123,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
     this.isGeneratedByPipe = resource.isGeneratedByPipe();
     this.isGeneratedByPipeConsensus = resource.isGeneratedByPipeConsensus();
     this.isGeneratedByHistoricalExtractor = isGeneratedByHistoricalExtractor;
+    this.tableNames = tableNames;
 
     this.dataContainer = new AtomicReference<>(null);
 
@@ -394,7 +396,7 @@ public class PipeTsFileInsertionEvent extends EnrichedEvent
               deviceID ->
                   pipePattern.mayOverlapWithDevice(((PlainDeviceID) 
deviceID).toStringID()));
     } catch (final Exception e) {
-      LOGGER.warn(
+      LOGGER.info(
           "Pipe {}: failed to get devices from TsFile {}, extract it anyway",
           pipeName,
           resource.getTsFilePath(),
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
index e3278977647..0ed563d52b5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileSource.java
@@ -111,7 +111,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
   private PipePattern pipePattern;
   private boolean isDbNameCoveredByPattern = false;
 
-  private boolean isHistoricalExtractorEnabled = false;
+  private boolean isHistoricalSourceEnabled = false;
   private long historicalDataExtractionStartTime = Long.MIN_VALUE; // Event 
time
   private long historicalDataExtractionEndTime = Long.MAX_VALUE; // Event time
 
@@ -174,7 +174,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
         EXTRACTOR_START_TIME_KEY,
         SOURCE_END_TIME_KEY,
         EXTRACTOR_END_TIME_KEY)) {
-      isHistoricalExtractorEnabled = true;
+      isHistoricalSourceEnabled = true;
 
       try {
         historicalDataExtractionStartTime =
@@ -214,7 +214,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
     // enabled, the pipe will lose some historical data.
     // 2. User may set the EXTRACTOR_HISTORY_START_TIME and 
EXTRACTOR_HISTORY_END_TIME without
     // enabling the historical data extraction, which may affect the realtime 
data extraction.
-    isHistoricalExtractorEnabled =
+    isHistoricalSourceEnabled =
         parameters.getBooleanOrDefault(
                 SystemConstant.RESTART_KEY, 
SystemConstant.RESTART_DEFAULT_VALUE)
             || parameters.getBooleanOrDefault(
@@ -382,7 +382,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
                 .peek(originalResourceList::add)
                 .filter(
                     resource ->
-                        isHistoricalExtractorEnabled
+                        isHistoricalSourceEnabled
                             &&
                             // Some resource is marked as deleted but not 
removed from the list.
                             !resource.isDeleted()
@@ -408,7 +408,7 @@ public class PipeHistoricalDataRegionTsFileSource 
implements PipeHistoricalDataR
                 .peek(originalResourceList::add)
                 .filter(
                     resource ->
-                        isHistoricalExtractorEnabled
+                        isHistoricalSourceEnabled
                             &&
                             // Some resource is marked as deleted but not 
removed from the list.
                             !resource.isDeleted()
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index c43b2265b17..1b4b32b1283 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -107,7 +107,9 @@ public class TsFileResource {
   protected TsFileResource next;
 
   /** time index */
-  private ITimeIndex timeIndex;
+  private volatile ITimeIndex timeIndex;
+
+  private final AtomicReference<Boolean> isEmpty = new AtomicReference<>();
 
   @SuppressWarnings("squid:S3077")
   private volatile ModificationFile modFile;
@@ -121,11 +123,11 @@ public class TsFileResource {
   /** used for check whether this file has internal unsorted data in 
compaction selection */
   private TsFileRepairStatus tsFileRepairStatus = TsFileRepairStatus.NORMAL;
 
-  private TsFileLock tsFileLock = new TsFileLock();
+  private final TsFileLock tsFileLock = new TsFileLock();
 
   private boolean isSeq;
 
-  private FSFactory fsFactory = FSFactoryProducer.getFSFactory();
+  private final FSFactory fsFactory = FSFactoryProducer.getFSFactory();
 
   private DataRegion.SettleTsFileCallBack settleTsFileCallBack;
 
@@ -647,6 +649,10 @@ public class TsFileResource {
    */
   public boolean remove() {
     forceMarkDeleted();
+    // To release the memory occupied by pipe if held by it
+    // Note that pipe can safely handle the case that the time index does not 
exist
+    isEmpty();
+    degradeTimeIndex();
     try {
       fsFactory.deleteIfExists(file);
       fsFactory.deleteIfExists(
@@ -935,13 +941,18 @@ public class TsFileResource {
    * @return resource map size
    */
   public long calculateRamSize() {
+    final ProgressIndex progressIndex = maxProgressIndex.get();
     if (timeIndex.getTimeIndexType() == ITimeIndex.FILE_TIME_INDEX_TYPE) {
-      return INSTANCE_SIZE + timeIndex.calculateRamSize();
+      return INSTANCE_SIZE
+          + timeIndex.calculateRamSize()
+          + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 
0);
     }
     if (deviceTimeIndexRamSize == 0) {
       deviceTimeIndexRamSize = timeIndex.calculateRamSize();
     }
-    return INSTANCE_SIZE + deviceTimeIndexRamSize;
+    return INSTANCE_SIZE
+        + deviceTimeIndexRamSize
+        + (Objects.nonNull(progressIndex) ? progressIndex.ramBytesUsed() : 0);
   }
 
   // used for compaction
@@ -1219,7 +1230,9 @@ public class TsFileResource {
   }
 
   public boolean isEmpty() {
-    return getFileStartTime() == Long.MAX_VALUE && getFileEndTime() == 
Long.MIN_VALUE;
+    isEmpty.compareAndSet(
+        null, getFileStartTime() == Long.MAX_VALUE && getFileEndTime() == 
Long.MIN_VALUE);
+    return isEmpty.get();
   }
 
   public String getDatabaseName() {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
index 2146808222c..d5094fa6688 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/AbstractCompactionTest.java
@@ -44,6 +44,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
+import org.apache.iotdb.db.storageengine.rescon.memory.TsFileResourceManager;
 import org.apache.iotdb.db.tools.validate.TsFileValidationTool;
 import org.apache.iotdb.db.utils.EnvironmentUtils;
 import org.apache.iotdb.db.utils.constant.TestConstant;
@@ -504,6 +505,7 @@ public class AbstractCompactionTest {
     }
     registeredTimePartitionDirs.clear();
     tsFileManager.clear();
+    TsFileResourceManager.getInstance().clear();
   }
 
   private void removeFiles() throws IOException {

Reply via email to