This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/table_disk_usage_statistics_with_cache by this push:
new d661431166b refactor
d661431166b is described below
commit d661431166b629c81e428fd9fec5e16ab1cb2ad9
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jan 23 15:51:32 2026 +0800
refactor
---
.../InformationSchemaContentSupplierFactory.java | 19 ++---
.../dataregion/utils/DiskUsageStatisticUtil.java | 2 +-
.../DataRegionTableSizeQueryContext.java | 98 +++++++++++++++++-----
.../tableDiskUsageCache/TableDiskUsageCache.java | 24 ++++--
.../TableDiskUsageCacheReader.java | 91 ++++++++++----------
5 files changed, 147 insertions(+), 87 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 6d5ef1ae908..e4eb500319b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -1198,8 +1198,7 @@ public class InformationSchemaContentSupplierFactory {
private boolean currentDatabaseOnlyHasOneTable;
private TableDiskUsageCacheReader currentDataRegionCacheReader;
- private DataRegionTableSizeQueryContext
currentDataRegionTableSizeQueryContext =
- new DataRegionTableSizeQueryContext(false);
+ private DataRegionTableSizeQueryContext
currentDataRegionTableSizeQueryContext;
private final StorageEngineTimePartitionIterator dataRegionIterator;
@@ -1229,7 +1228,8 @@ public class InformationSchemaContentSupplierFactory {
return false;
}
currentDataRegionTableSizeQueryContext =
- new DataRegionTableSizeQueryContext(false);
+ new DataRegionTableSizeQueryContext(
+ false, operatorContext.getInstanceContext());
return
PathUtils.isTableModelDatabase(dataRegion.getDatabaseName());
}),
Optional.empty());
@@ -1260,8 +1260,7 @@ public class InformationSchemaContentSupplierFactory {
new TableDiskUsageCacheReader(
currentDataRegion,
currentDataRegionTableSizeQueryContext,
- currentDatabaseOnlyHasOneTable,
- Optional.ofNullable(operatorContext.getInstanceContext()));
+ currentDatabaseOnlyHasOneTable);
return true;
}
} catch (Exception e) {
@@ -1333,15 +1332,7 @@ public class InformationSchemaContentSupplierFactory {
return null;
}
- boolean finished = false;
- do {
- if (!currentDataRegionCacheReader.calculateNextFile()) {
- finished = true;
- break;
- }
- } while (System.nanoTime() - start < maxRuntime);
-
- if (!finished) {
+ if (!currentDataRegionCacheReader.checkAllFilesInTsFileManager(start,
maxRuntime)) {
return null;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
index c8298a7aa21..02f7339d56b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/DiskUsageStatisticUtil.java
@@ -207,7 +207,7 @@ public abstract class DiskUsageStatisticUtil implements
Closeable {
}
@Override
- public void close() throws IOException {
+ public void close() {
releaseReadLocks();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
index 521abc4a167..8e5ec508253 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/DataRegionTableSizeQueryContext.java
@@ -19,13 +19,17 @@
package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
+import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import org.apache.tsfile.utils.Accountable;
+import org.apache.tsfile.utils.RamUsageEstimator;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.Map;
+import java.util.Optional;
public class DataRegionTableSizeQueryContext implements Accountable {
@@ -38,8 +42,17 @@ public class DataRegionTableSizeQueryContext implements
Accountable {
private long previousUsedTimePartition;
private TimePartitionTableSizeQueryContext previousUsedTimePartitionContext;
+ private final Optional<FragmentInstanceContext> fragmentInstanceContext;
+ private long acquiredMemory;
+
public DataRegionTableSizeQueryContext(boolean needAllData) {
+ this(needAllData, null);
+ }
+
+ public DataRegionTableSizeQueryContext(
+ boolean needAllData, FragmentInstanceContext fragmentInstanceContext) {
this.needAllData = needAllData;
+ this.fragmentInstanceContext =
Optional.ofNullable(fragmentInstanceContext);
}
public Map<Long, TimePartitionTableSizeQueryContext>
getTimePartitionTableSizeQueryContextMap() {
@@ -50,35 +63,29 @@ public class DataRegionTableSizeQueryContext implements
Accountable {
return timePartitionTableSizeQueryContextMap.isEmpty();
}
- public int getObjectFileNum() {
- return objectFileNum;
- }
-
- public long getObjectFileSize() {
- long totalSize = 0;
- for (TimePartitionTableSizeQueryContext timePartitionContext :
- timePartitionTableSizeQueryContextMap.values()) {
- totalSize += timePartitionContext.getObjectFileSize();
- }
- return totalSize;
+ public Optional<FragmentInstanceContext> getFragmentInstanceContext() {
+ return fragmentInstanceContext;
}
public void addCachedTsFileIDAndOffsetInValueFile(TsFileID tsFileID, long
offset) {
- switchTimePartition(tsFileID.timePartitionId);
-
previousUsedTimePartitionContext.addCachedTsFileIDAndOffsetInValueFile(tsFileID,
offset);
+ if (useTimePartition(tsFileID.timePartitionId)) {
+
previousUsedTimePartitionContext.addCachedTsFileIDAndOffsetInValueFile(tsFileID,
offset);
+ }
}
public void replaceCachedTsFileID(TsFileID originTsFileID, TsFileID
newTsFileID) {
- switchTimePartition(originTsFileID.timePartitionId);
- previousUsedTimePartitionContext.replaceCachedTsFileID(originTsFileID,
newTsFileID);
+ if (useTimePartition(originTsFileID.timePartitionId)) {
+ previousUsedTimePartitionContext.replaceCachedTsFileID(originTsFileID,
newTsFileID);
+ }
}
public void updateResult(String table, long size, long currentTimePartition)
{
- switchTimePartition(currentTimePartition);
- previousUsedTimePartitionContext.updateResult(table, size, needAllData);
+ if (useTimePartition(currentTimePartition)) {
+ previousUsedTimePartitionContext.updateResult(table, size, needAllData);
+ }
}
- private void switchTimePartition(long currentTimePartition) {
+ private boolean useTimePartition(long currentTimePartition) {
if (currentTimePartition != previousUsedTimePartition
|| previousUsedTimePartitionContext == null) {
TimePartitionTableSizeQueryContext currentTimePartitionContext =
@@ -89,11 +96,12 @@ public class DataRegionTableSizeQueryContext implements
Accountable {
? new TimePartitionTableSizeQueryContext(new HashMap<>())
: v);
if (currentTimePartitionContext == null) {
- return;
+ return false;
}
previousUsedTimePartition = currentTimePartition;
previousUsedTimePartitionContext = currentTimePartitionContext;
}
+ return true;
}
public void addTimePartition(
@@ -101,10 +109,62 @@ public class DataRegionTableSizeQueryContext implements
Accountable {
timePartitionTableSizeQueryContextMap.put(timePartition,
timePartitionTableSizeQueryContext);
}
+ public int getObjectFileNum() {
+ return objectFileNum;
+ }
+
+ public long getObjectFileSize() {
+ long totalSize = 0;
+ for (TimePartitionTableSizeQueryContext timePartitionContext :
+ timePartitionTableSizeQueryContextMap.values()) {
+ totalSize += timePartitionContext.getObjectFileSize();
+ }
+ return totalSize;
+ }
+
public void updateObjectFileNum(int delta) {
this.objectFileNum += delta;
}
+ public void reserveMemoryForResultMap() {
+ long cost =
+ RamUsageEstimator.sizeOfMapWithKnownShallowSize(
+ timePartitionTableSizeQueryContextMap,
+ RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
+ RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY);
+ reserveMemory(cost);
+ }
+
+ public void reserveMemoryForTsFileIDs() {
+ long cost =
+ timePartitionTableSizeQueryContextMap.values().stream()
+
.mapToLong(TimePartitionTableSizeQueryContext::ramBytesUsedOfTsFileIDOffsetMap)
+ .sum();
+ reserveMemory(cost);
+ }
+
+ public void reserveMemory(long size) {
+ if (!fragmentInstanceContext.isPresent()) {
+ return;
+ }
+ MemoryReservationManager memoryReservationContext =
+ fragmentInstanceContext.get().getMemoryReservationContext();
+ memoryReservationContext.reserveMemoryCumulatively(size);
+ memoryReservationContext.reserveMemoryImmediately();
+ acquiredMemory += size;
+ }
+
+ public void releaseMemory() {
+ if (!fragmentInstanceContext.isPresent()) {
+ return;
+ }
+ fragmentInstanceContext
+ .get()
+ .getMemoryReservationContext()
+ .releaseMemoryCumulatively(acquiredMemory);
+ acquiredMemory = 0;
+ }
+
@Override
public long ramBytesUsed() {
return 0;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index 132d39a3a74..7555cba1171 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -46,6 +46,7 @@ public class TableDiskUsageCache {
protected final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new
HashMap<>();
protected final ScheduledExecutorService scheduledExecutorService;
+ private int counter = 0;
protected volatile boolean failedToRecover = false;
protected TableDiskUsageCache() {
@@ -60,13 +61,14 @@ public class TableDiskUsageCache {
while (!Thread.currentThread().isInterrupted()) {
try {
checkAndMaySyncObjectDeltaToFile();
- Operation operation = queue.poll(10, TimeUnit.MILLISECONDS);
- if (operation == null) {
- checkAndMayCloseIdleWriter();
- checkAndMayCompact(TimeUnit.SECONDS.toMillis(1));
- continue;
+ Operation operation = queue.poll(1, TimeUnit.SECONDS);
+ if (operation != null) {
+ operation.apply(this);
+ counter++;
+ }
+ if (operation == null || counter % 1000 == 0) {
+ timedCheck();
}
- operation.apply(this);
} catch (InterruptedException e) {
return;
} catch (Exception e) {
@@ -78,6 +80,12 @@ public class TableDiskUsageCache {
}
}
+ private void timedCheck() {
+ checkAndMayCloseIdleWriter();
+ checkAndMayCompact(TimeUnit.SECONDS.toMillis(1));
+ counter = 0;
+ }
+
protected void failedToRecover(Exception e) {
failedToRecover = true;
LOGGER.error("Failed to recover TableDiskUsageCache", e);
@@ -118,9 +126,7 @@ public class TableDiskUsageCache {
}
public void writeObjectDelta(
- String database, int regionId, long timePartition, String table, long
size, int num) {
- throw new UnsupportedOperationException();
- }
+ String database, int regionId, long timePartition, String table, long
size, int num) {}
public CompletableFuture<Pair<TsFileTableSizeCacheReader,
IObjectTableSizeCacheReader>> startRead(
String database, int regionId, boolean readTsFileCache, boolean
readObjectFileCache) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
index 7c6b8b713b1..155671de97f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
@@ -19,8 +19,6 @@
package org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
-import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
-import
org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import
org.apache.iotdb.db.storageengine.dataregion.utils.TableDiskUsageStatisticUtil;
@@ -28,7 +26,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.ob
import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.tsfile.TsFileTableSizeCacheReader;
import org.apache.tsfile.utils.Pair;
-import org.apache.tsfile.utils.RamUsageEstimator;
import java.io.Closeable;
import java.io.IOException;
@@ -37,7 +34,6 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class TableDiskUsageCacheReader implements Closeable {
@@ -51,13 +47,13 @@ public class TableDiskUsageCacheReader implements Closeable
{
private TsFileTableSizeCacheReader tsFileTableSizeCacheReader;
private IObjectTableSizeCacheReader objectTableSizeCacheReader;
- private long acquiredMemory;
+ private boolean objectFileSizeLoaded = false;
private boolean tsFileIdKeysPrepared = false;
+ private boolean allTsFileResourceChecked = false;
private final Iterator<Map.Entry<Long, TimePartitionTableSizeQueryContext>>
timePartitionIterator;
private final boolean currentDatabaseOnlyHasOneTable;
- private final Optional<FragmentInstanceContext> context;
private TableDiskUsageStatisticUtil tableDiskUsageStatisticUtil;
private final List<Pair<TsFileID, Long>> tsFilesToQueryInCache = new
ArrayList<>();
@@ -66,20 +62,14 @@ public class TableDiskUsageCacheReader implements Closeable
{
public TableDiskUsageCacheReader(
DataRegion dataRegion,
DataRegionTableSizeQueryContext dataRegionContext,
- boolean databaseHasOnlyOneTable,
- Optional<FragmentInstanceContext> context) {
+ boolean databaseHasOnlyOneTable) {
this.dataRegion = dataRegion;
this.regionId = Integer.parseInt(dataRegion.getDataRegionIdString());
this.dataRegionContext = dataRegionContext;
this.currentDatabaseOnlyHasOneTable = databaseHasOnlyOneTable;
- this.context = context;
this.timePartitionIterator =
dataRegionContext.getTimePartitionTableSizeQueryContextMap().entrySet().iterator();
- reserveMemory(
- RamUsageEstimator.sizeOfMapWithKnownShallowSize(
- dataRegionContext.getTimePartitionTableSizeQueryContextMap(),
- RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP,
- RamUsageEstimator.SHALLOW_SIZE_OF_HASHMAP_ENTRY));
+ dataRegionContext.reserveMemoryForResultMap();
}
public boolean prepareCacheReader(long startTime, long maxRunTime) throws
Exception {
@@ -111,9 +101,13 @@ public class TableDiskUsageCacheReader implements
Closeable {
}
public boolean loadObjectFileTableSizeCache(long startTime, long maxRunTime)
throws Exception {
+ if (objectFileSizeLoaded) {
+ return true;
+ }
if (objectTableSizeCacheReader.loadObjectFileTableSize(
dataRegionContext, startTime, maxRunTime)) {
- objectTableSizeCacheReader.close();
+ closeObjectFileTableSizeCacheReader();
+ objectFileSizeLoaded = true;
return true;
}
return false;
@@ -124,17 +118,27 @@ public class TableDiskUsageCacheReader implements
Closeable {
return true;
}
if (tsFileTableSizeCacheReader.readFromKeyFile(dataRegionContext,
startTime, maxRunTime)) {
- reserveMemory(
-
dataRegionContext.getTimePartitionTableSizeQueryContextMap().values().stream()
-
.mapToLong(TimePartitionTableSizeQueryContext::ramBytesUsedOfTsFileIDOffsetMap)
- .sum());
+ dataRegionContext.reserveMemoryForTsFileIDs();
tsFileIdKeysPrepared = true;
return true;
}
return false;
}
- public boolean calculateNextFile() {
+ public boolean checkAllFilesInTsFileManager(long start, long maxRunTime) {
+ if (allTsFileResourceChecked) {
+ return true;
+ }
+ do {
+ if (!calculateNextFile()) {
+ allTsFileResourceChecked = true;
+ break;
+ }
+ } while (System.nanoTime() - start < maxRunTime);
+ return allTsFileResourceChecked;
+ }
+
+ private boolean calculateNextFile() {
while (true) {
if (tableDiskUsageStatisticUtil != null &&
tableDiskUsageStatisticUtil.hasNextFile()) {
tableDiskUsageStatisticUtil.calculateNextFile();
@@ -144,6 +148,7 @@ public class TableDiskUsageCacheReader implements Closeable
{
Map.Entry<Long, TimePartitionTableSizeQueryContext>
currentTimePartitionEntry =
timePartitionIterator.next();
long timePartition = currentTimePartitionEntry.getKey();
+ closeTableDiskUsageStatisticUtil();
tableDiskUsageStatisticUtil =
new TableDiskUsageStatisticUtil(
dataRegion,
@@ -151,8 +156,9 @@ public class TableDiskUsageCacheReader implements Closeable
{
currentTimePartitionEntry.getValue(),
currentDatabaseOnlyHasOneTable,
tsFilesToQueryInCache,
- context);
+ dataRegionContext.getFragmentInstanceContext());
} else {
+ closeTableDiskUsageStatisticUtil();
return false;
}
}
@@ -173,38 +179,35 @@ public class TableDiskUsageCacheReader implements
Closeable {
return dataRegion;
}
- private void reserveMemory(long size) {
- if (context.isPresent()) {
- MemoryReservationManager memoryReservationContext =
- context.get().getMemoryReservationContext();
- memoryReservationContext.reserveMemoryCumulatively(size);
- memoryReservationContext.reserveMemoryImmediately();
- acquiredMemory += size;
+ @Override
+ public void close() throws IOException {
+ closeTableDiskUsageStatisticUtil();
+ closeTsFileTableSizeCacheReader();
+ closeObjectFileTableSizeCacheReader();
+ if (prepareReaderFuture != null) {
+ TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(),
regionId);
+ prepareReaderFuture = null;
}
+ dataRegionContext.releaseMemory();
}
- @Override
- public void close() throws IOException {
+ private void closeTableDiskUsageStatisticUtil() {
+ if (tableDiskUsageStatisticUtil != null) {
+ tableDiskUsageStatisticUtil.close();
+ tableDiskUsageStatisticUtil = null;
+ }
+ }
+
+ private void closeTsFileTableSizeCacheReader() {
if (tsFileTableSizeCacheReader != null) {
tsFileTableSizeCacheReader.closeCurrentFile();
- tsFileTableSizeCacheReader = null;
}
+ }
+
+ private void closeObjectFileTableSizeCacheReader() {
if (objectTableSizeCacheReader != null) {
objectTableSizeCacheReader.close();
objectTableSizeCacheReader = null;
}
- if (prepareReaderFuture != null) {
- TableDiskUsageCache.getInstance().endRead(dataRegion.getDatabaseName(),
regionId);
- prepareReaderFuture = null;
- }
- releaseMemory();
- }
-
- private void releaseMemory() {
- if (!context.isPresent()) {
- return;
- }
-
context.get().getMemoryReservationContext().releaseMemoryCumulatively(acquiredMemory);
- acquiredMemory = 0;
}
}