This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to 
refs/heads/table_disk_usage_statistics_with_cache by this push:
     new d661431166b refactor
d661431166b is described below

commit d661431166b629c81e428fd9fec5e16ab1cb2ad9
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 23 15:51:32 2026 +0800

    refactor
---
 .../InformationSchemaContentSupplierFactory.java   | 19 ++---
 .../dataregion/utils/DiskUsageStatisticUtil.java   |  2 +-
 .../DataRegionTableSizeQueryContext.java           | 98 +++++++++++++++++-----
 .../tableDiskUsageCache/TableDiskUsageCache.java   | 24 ++++--
 .../TableDiskUsageCacheReader.java                 | 91 ++++++++++----------
 5 files changed, 147 insertions(+), 87 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 6d5ef1ae908..e4eb500319b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -1198,8 +1198,7 @@ public class InformationSchemaContentSupplierFactory {
     private boolean currentDatabaseOnlyHasOneTable;
 
     private TableDiskUsageCacheReader currentDataRegionCacheReader;
-    private DataRegionTableSizeQueryContext 
currentDataRegionTableSizeQueryContext =
-        new DataRegionTableSizeQueryContext(false);
+    private DataRegionTableSizeQueryContext 
currentDataRegionTableSizeQueryContext;
 
     private final StorageEngineTimePartitionIterator dataRegionIterator;
 
@@ -1229,7 +1228,8 @@ public class InformationSchemaContentSupplierFactory {
                       return false;
                     }
                     currentDataRegionTableSizeQueryContext =
-                        new DataRegionTableSizeQueryContext(false);
+                        new DataRegionTableSizeQueryContext(
+                            false, operatorContext.getInstanceContext());
                     return 
PathUtils.isTableModelDatabase(dataRegion.getDatabaseName());
                   }),
               Optional.empty());
@@ -1260,8 +1260,7 @@ public class InformationSchemaContentSupplierFactory {
               new TableDiskUsageCacheReader(
                   currentDataRegion,
                   currentDataRegionTableSizeQueryContext,
-                  currentDatabaseOnlyHasOneTable,
-                  Optional.ofNullable(operatorContext.getInstanceContext()));
+                  currentDatabaseOnlyHasOneTable);
           return true;
         }
       } catch (Exception e) {
@@ -1333,15 +1332,7 @@ public class InformationSchemaContentSupplierFactory {
           return null;
         }
 
-        boolean finished = false;
-        do {
-          if (!currentDataRegionCacheReader.calculateNextFile()) {
-            finished = true;
-            break;
-          }
-        } while (System.nanoTime() - start < maxRuntime);
-
-        if (!finished) {
+        if (!currentDataRegionCacheReader.checkAllFilesInTsFileManager(start, 
maxRuntime)) {
           return null;
         }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
index c8298a7aa21..02f7339d56b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
@@ -207,7 +207,7 @@ public abstract class DiskUsageStatisticUtil implements 
Closeable {
   }
 
   @Override
-  public void close() throws IOException {
+  public void close() {
     releaseReadLocks();
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
index 521abc4a167..8e5ec508253 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
@@ -19,13 +19,17 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
 
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 
 import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
+import java.util.Optional;
 
 public class DataRegionTableSizeQueryContext implements Accountable {
 
@@ -38,8 +42,17 @@ public class DataRegionTableSizeQueryContext implements 
Accountable {
   private long previousUsedTimePartition;
   private TimePartitionTableSizeQueryContext previousUsedTimePartitionContext;
 
+  private final Optional<FragmentInstanceContext> fragmentInstanceContext;
+  private long acquiredMemory;
+
   public DataRegionTableSizeQueryContext(boolean needAllData) {
+    this(needAllData, null);
+  }
+
+  public DataRegionTableSizeQueryContext(
+      boolean needAllData, FragmentInstanceContext fragmentInstanceContext) {
     this.needAllData = needAllData;
+    this.fragmentInstanceContext = 
Optional.ofNullable(fragmentInstanceContext);
   }
 
   public Map<Long, TimePartitionTableSizeQueryContext> 
getTimePartitionTableSizeQueryContextMap() {
@@ -50,35 +63,29 @@ public class DataRegionTableSizeQueryContext implements 
Accountable {
     return timePartitionTableSizeQueryContextMap.isEmpty();
   }
 
-  public int getObjectFileNum() {
-    return objectFileNum;
-  }
-
-  public long getObjectFileSize() {
-    long totalSize = 0;
-    for (TimePartitionTableSizeQueryContext timePartitionContext :
-        timePartitionTableSizeQueryContextMap.values()) {
-      totalSize += timePartitionContext.getObjectFileSize();
-    }
-    return totalSize;
+  public Optional<FragmentInstanceContext> getFragmentInstanceContext() {
+    return fragmentInstanceContext;
   }
 
   public void addCachedTsFileIDAndOffsetInValueFile(TsFileID tsFileID, long 
offset) {
-    switchTimePartition(tsFileID.timePartitionId);
-    
previousUsedTimePartitionContext.addCachedTsFileIDAndOffsetInValueFile(tsFileID,
 offset);
+    if (useTimePartition(tsFileID.timePartitionId)) {
+      
previousUsedTimePartitionContext.addCachedTsFileIDAndOffsetInValueFile(tsFileID,
 offset);
+    }
   }
 
   public void replaceCachedTsFileID(TsFileID originTsFileID, TsFileID 
newTsFileID) {
-    switchTimePartition(originTsFileID.timePartitionId);
-    previousUsedTimePartitionContext.replaceCachedTsFileID(originTsFileID, 
newTsFileID);
+    if (useTimePartition(originTsFileID.timePartitionId)) {
+      previousUsedTimePartitionContext.replaceCachedTsFileID(originTsFileID, 
newTsFileID);
+    }
   }
 
   public void updateResult(String table, long size, long currentTimePartition) 
{
-    switchTimePartition(currentTimePartition);
-    previousUsedTimePartitionContext.updateResult(table, size, needAllData);
+    if (useTimePartition(currentTimePartition)) {
+      previousUsedTimePartitionContext.updateResult(table, size, needAllData);
+    }
   }
 
-  private void switchTimePartition(long currentTimePartition) {
+  private boolean useTimePartition(long currentTimePartition) {
     if (currentTimePartition != previousUsedTimePartition
         || previousUsedTimePartitionContext == null) {
       TimePartitionTableSizeQueryContext currentTimePartitionContext =
@@ -89,11 +96,12 @@ public class DataRegionTableSizeQueryContext implements 
Accountable {
                       ? new TimePartitionTableSizeQueryContext(new HashMap<>())
                       : v);
       if (currentTimePartitionContext == null) {
-        return;
+        return false;
       }
       previousUsedTimePartition = currentTimePartition;
       previousUsedTimePartitionContext = currentTimePartitionContext;
     }
+    return true;
   }
 
   public void addTimePartition(
@@ -101,10 +109,62 @@ public class DataRegionTableSizeQueryContext implements 
Accountable {
     timePartitionTableSizeQueryContextMap.put(timePartition, 
timePartitionTableSizeQueryContext);
   }
 
+  public int getObjectFileNum() {
+    return objectFileNum;
+  }
+
+  public long getObjectFileSize() {
+    long totalSize = 0;
+    for (TimePartitionTableSizeQueryContext timePartitionContext :
+        timePartitionTableSizeQueryContextMap.values()) {
+      totalSize += timePartitionContext.getObjectFileSize();
+    }
+    return totalSize;
+  }
+
   public void updateObjectFileNum(int delta) {
     this.objectFileNum += delta;
   }
 
+  public void reserveMemoryForResultMap() {
+    long cost =
+        RamUsageEstimator.sizeOfMapWithKnownShallowSize(
+            timePartitionTableSizeQueryContextMap,
+            RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
+            RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+    reserveMemory(cost);
+  }
+
+  public void reserveMemoryForTsFileIDs() {
+    long cost =
+        timePartitionTableSizeQueryContextMap.values().stream()
+            
.mapToLong(TimePartitionTableSizeQueryContext::ramBytesUsedOfTsFileIDOffsetMap)
+            .sum();
+    reserveMemory(cost);
+  }
+
+  public void reserveMemory(long size) {
+    if (!fragmentInstanceContext.isPresent()) {
+      return;
+    }
+    MemoryReservationManager memoryReservationContext =
+        fragmentInstanceContext.get().getMemoryReservationContext();
+    memoryReservationContext.reserveMemoryCumulatively(size);
+    memoryReservationContext.reserveMemoryImmediately();
+    acquiredMemory += size;
+  }
+
+  public void releaseMemory() {
+    if (!fragmentInstanceContext.isPresent()) {
+      return;
+    }
+    fragmentInstanceContext
+        .get()
+        .getMemoryReservationContext()
+        .releaseMemoryCumulatively(acquiredMemory);
+    acquiredMemory = 0;
+  }
+
   @Override
   public long ramBytesUsed() {
     return 0;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index 132d39a3a74..7555cba1171 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -46,6 +46,7 @@ public class TableDiskUsageCache {
   protected final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
   protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new 
HashMap<>();
   protected final ScheduledExecutorService scheduledExecutorService;
+  private int counter = 0;
   protected volatile boolean failedToRecover = false;
 
   protected TableDiskUsageCache() {
@@ -60,13 +61,14 @@ public class TableDiskUsageCache {
       while (!Thread.currentThread().isInterrupted()) {
         try {
           checkAndMaySyncObjectDeltaToFile();
-          Operation operation = queue.poll(10, TimeUnit.MILLISECONDS);
-          if (operation == null) {
-            checkAndMayCloseIdleWriter();
-            checkAndMayCompact(TimeUnit.SECONDS.toMillis(1));
-            continue;
+          Operation operation = queue.poll(1, TimeUnit.SECONDS);
+          if (operation != null) {
+            operation.apply(this);
+            counter++;
+          }
+          if (operation == null || counter % 1000 == 0) {
+            timedCheck();
           }
-          operation.apply(this);
         } catch (InterruptedException e) {
           return;
         } catch (Exception e) {
@@ -78,6 +80,12 @@ public class TableDiskUsageCache {
     }
   }
 
+  private void timedCheck() {
+    checkAndMayCloseIdleWriter();
+    checkAndMayCompact(TimeUnit.SECONDS.toMillis(1));
+    counter = 0;
+  }
+
   protected void failedToRecover(Exception e) {
     failedToRecover = true;
     LOGGER.error("Failed to recover TableDiskUsageCache", e);
@@ -118,9 +126,7 @@ public class TableDiskUsageCache {
   }
 
   public void writeObjectDelta(
-      String database, int regionId, long timePartition, String table, long 
size, int num) {
-    throw new UnsupportedOperationException();
-  }
+      String database, int regionId, long timePartition, String table, long 
size, int num) {}
 
   public CompletableFuture<Pair<TsFileTableSizeCacheReader, 
IObjectTableSizeCacheReader>> startRead(
       String database, int regionId, boolean readTsFileCache, boolean 
readObjectFileCache) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
index 7c6b8b713b1..155671de97f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
@@ -19,8 +19,6 @@
 
 package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
 
-import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import 
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
 import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
 import 
org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
@@ -28,7 +26,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.ob
 import 
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
 
 import org.apache.tsfile.utils.Pair;
-import org.apache.tsfile.utils.RamUsageEstimator;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -37,7 +34,6 @@ import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 public class TableDiskUsageCacheReader implements Closeable {
@@ -51,13 +47,13 @@ public class TableDiskUsageCacheReader implements Closeable 
{
   private TsFileTableSizeCacheReader tsFileTableSizeCacheReader;
   private IObjectTableSizeCacheReader objectTableSizeCacheReader;
 
-  private long acquiredMemory;
+  private boolean objectFileSizeLoaded = false;
   private boolean tsFileIdKeysPrepared = false;
+  private boolean allTsFileResourceChecked = false;
 
   private final Iterator<Map.Entry<Long, TimePartitionTableSizeQueryContext>> 
timePartitionIterator;
 
   private final boolean currentDatabaseOnlyHasOneTable;
-  private final Optional<FragmentInstanceContext> context;
   private TableDiskUsageStatisticUtil tableDiskUsageStatisticUtil;
 
   private final List<Pair<TsFileID, Long>> tsFilesToQueryInCache = new 
ArrayList<>();
@@ -66,20 +62,14 @@ public class TableDiskUsageCacheReader implements Closeable 
{
   public TableDiskUsageCacheReader(
       DataRegion dataRegion,
       DataRegionTableSizeQueryContext dataRegionContext,
-      boolean databaseHasOnlyOneTable,
-      Optional<FragmentInstanceContext> context) {
+      boolean databaseHasOnlyOneTable) {
     this.dataRegion = dataRegion;
     this.regionId = Integer.parseInt(dataRegion.getDataRegionIdString());
     this.dataRegionContext = dataRegionContext;
     this.currentDatabaseOnlyHasOneTable = databaseHasOnlyOneTable;
-    this.context = context;
     this.timePartitionIterator =
         
dataRegionContext.getTimePartitionTableSizeQueryContextMap().entrySet().iterator();
-    reserveMemory(
-        RamUsageEstimator.sizeOfMapWithKnownShallowSize(
-            dataRegionContext.getTimePartitionTableSizeQueryContextMap(),
-            RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
-            RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY));
+    dataRegionContext.reserveMemoryForResultMap();
   }
 
   public boolean prepareCacheReader(long startTime, long maxRunTime) throws 
Exception {
@@ -111,9 +101,13 @@ public class TableDiskUsageCacheReader implements 
Closeable {
   }
 
   public boolean loadObjectFileTableSizeCache(long startTime, long maxRunTime) 
throws Exception {
+    if (objectFileSizeLoaded) {
+      return true;
+    }
     if (objectTableSizeCacheReader.loadObjectFileTableSize(
         dataRegionContext, startTime, maxRunTime)) {
-      objectTableSizeCacheReader.close();
+      closeObjectFileTableSizeCacheReader();
+      objectFileSizeLoaded = true;
       return true;
     }
     return false;
@@ -124,17 +118,27 @@ public class TableDiskUsageCacheReader implements 
Closeable {
       return true;
     }
     if (tsFileTableSizeCacheReader.readFromKeyFile(dataRegionContext, 
startTime, maxRunTime)) {
-      reserveMemory(
-          
dataRegionContext.getTimePartitionTableSizeQueryContextMap().values().stream()
-              
.mapToLong(TimePartitionTableSizeQueryContext::ramBytesUsedOfTsFileIDOffsetMap)
-              .sum());
+      dataRegionContext.reserveMemoryForTsFileIDs();
       tsFileIdKeysPrepared = true;
       return true;
     }
     return false;
   }
 
-  public boolean calculateNextFile() {
+  public boolean checkAllFilesInTsFileManager(long start, long maxRunTime) {
+    if (allTsFileResourceChecked) {
+      return true;
+    }
+    do {
+      if (!calculateNextFile()) {
+        allTsFileResourceChecked = true;
+        break;
+      }
+    } while (System.nanoTime() - start < maxRunTime);
+    return allTsFileResourceChecked;
+  }
+
+  private boolean calculateNextFile() {
     while (true) {
       if (tableDiskUsageStatisticUtil != null && 
tableDiskUsageStatisticUtil.hasNextFile()) {
         tableDiskUsageStatisticUtil.calculateNextFile();
@@ -144,6 +148,7 @@ public class TableDiskUsageCacheReader implements Closeable 
{
         Map.Entry<Long, TimePartitionTableSizeQueryContext> 
currentTimePartitionEntry =
             timePartitionIterator.next();
         long timePartition = currentTimePartitionEntry.getKey();
+        closeTableDiskUsageStatisticUtil();
         tableDiskUsageStatisticUtil =
             new TableDiskUsageStatisticUtil(
                 dataRegion,
@@ -151,8 +156,9 @@ public class TableDiskUsageCacheReader implements Closeable 
{
                 currentTimePartitionEntry.getValue(),
                 currentDatabaseOnlyHasOneTable,
                 tsFilesToQueryInCache,
-                context);
+                dataRegionContext.getFragmentInstanceContext());
       } else {
+        closeTableDiskUsageStatisticUtil();
         return false;
       }
     }
@@ -173,38 +179,35 @@ public class TableDiskUsageCacheReader implements 
Closeable {
     return dataRegion;
   }
 
-  private void reserveMemory(long size) {
-    if (context.isPresent()) {
-      MemoryReservationManager memoryReservationContext =
-          context.get().getMemoryReservationContext();
-      memoryReservationContext.reserveMemoryCumulatively(size);
-      memoryReservationContext.reserveMemoryImmediately();
-      acquiredMemory += size;
+  @Override
+  public void close() throws IOException {
+    closeTableDiskUsageStatisticUtil();
+    closeTsFileTableSizeCacheReader();
+    closeObjectFileTableSizeCacheReader();
+    if (prepareReaderFuture != null) {
+      TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(), 
regionId);
+      prepareReaderFuture = null;
     }
+    dataRegionContext.releaseMemory();
   }
 
-  @Override
-  public void close() throws IOException {
+  private void closeTableDiskUsageStatisticUtil() {
+    if (tableDiskUsageStatisticUtil != null) {
+      tableDiskUsageStatisticUtil.close();
+      tableDiskUsageStatisticUtil = null;
+    }
+  }
+
+  private void closeTsFileTableSizeCacheReader() {
     if (tsFileTableSizeCacheReader != null) {
       tsFileTableSizeCacheReader.closeCurrentFile();
-      tsFileTableSizeCacheReader = null;
     }
+  }
+
+  private void closeObjectFileTableSizeCacheReader() {
     if (objectTableSizeCacheReader != null) {
       objectTableSizeCacheReader.close();
       objectTableSizeCacheReader = null;
     }
-    if (prepareReaderFuture != null) {
-      TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(), 
regionId);
-      prepareReaderFuture = null;
-    }
-    releaseMemory();
-  }
-
-  private void releaseMemory() {
-    if (!context.isPresent()) {
-      return;
-    }
-    
context.get().getMemoryReservationContext().releaseMemoryCumulatively(acquiredMemory);
-    acquiredMemory = 0;
   }
 }

Reply via email to