This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/table_disk_usage_statistics_with_cache by this push:
new 8b115ea6f0f rename
8b115ea6f0f is described below
commit 8b115ea6f0f2260375d50a199861a8551996951e
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jan 26 15:19:13 2026 +0800
rename
---
.../tableDiskUsageCache/TableDiskUsageCache.java | 37 +++++++++++++++-------
.../tsfile/TsFileTableDiskUsageCacheWriter.java | 9 ++++++
2 files changed, 34 insertions(+), 12 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index a0f2fa4bab2..c0042ff372c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -47,7 +47,7 @@ public class TableDiskUsageCache {
protected final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new
HashMap<>();
protected final ScheduledExecutorService scheduledExecutorService;
- private int counter = 0;
+ private int processedOperationCount = 0;
protected volatile boolean failedToRecover = false;
protected TableDiskUsageCache() {
@@ -61,14 +61,17 @@ public class TableDiskUsageCache {
try {
while (!Thread.currentThread().isInterrupted()) {
try {
- checkAndMaySyncObjectDeltaToFile();
+ for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
+ syncTsFileTableSizeCacheIfNecessary(writer);
+ persistPendingObjectDeltasIfNecessary(writer);
+ }
Operation operation = queue.poll(1, TimeUnit.SECONDS);
if (operation != null) {
operation.apply(this);
- counter++;
+ processedOperationCount++;
}
- if (operation == null || counter % 1000 == 0) {
- timedCheck();
+ if (operation == null || processedOperationCount % 1000 == 0) {
+ performPeriodicMaintenance();
}
} catch (InterruptedException e) {
return;
@@ -81,10 +84,10 @@ public class TableDiskUsageCache {
}
}
- private void timedCheck() {
+ private void performPeriodicMaintenance() {
checkAndMayCloseIdleWriter();
- checkAndMayCompact(TimeUnit.SECONDS.toMillis(1));
- counter = 0;
+ compactIfNecessary(TimeUnit.SECONDS.toMillis(1));
+ processedOperationCount = 0;
}
/**
@@ -96,9 +99,18 @@ public class TableDiskUsageCache {
LOGGER.error("Failed to recover TableDiskUsageCache", e);
}
- protected void checkAndMaySyncObjectDeltaToFile() {}
+ protected void
syncTsFileTableSizeCacheIfNecessary(DataRegionTableSizeCacheWriter writer) {
+ try {
+ writer.tsFileCacheWriter.syncIfNecessary();
+ } catch (IOException e) {
+ LOGGER.warn("Failed to sync tsfile table size cache.", e);
+ }
+ }
- protected void checkAndMayCompact(long maxRunTime) {
+ // Hook for subclasses to persist pending object table size deltas. No-op by
default.
+ protected void
persistPendingObjectDeltasIfNecessary(DataRegionTableSizeCacheWriter writer) {}
+
+ protected void compactIfNecessary(long maxRunTime) {
long startTime = System.currentTimeMillis();
for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
if (System.currentTimeMillis() - startTime > maxRunTime) {
@@ -239,7 +251,7 @@ public class TableDiskUsageCache {
DataRegionTableSizeCacheWriter writer =
tableDiskUsageCache.writerMap.get(regionId);
try {
if (writer == null || writer.getRemovedFuture() != null) {
- // region is removed
+ // region is removing or removed
future.complete(
new Pair<>(
new TsFileTableSizeCacheReader(0, null, 0, null, regionId),
@@ -279,6 +291,7 @@ public class TableDiskUsageCache {
}
writer.decreaseActiveReaderNum();
if (writer.getRemovedFuture() != null) {
+ writer.close();
writer.getRemovedFuture().complete(null);
writer.setRemovedFuture(null);
return null;
@@ -363,11 +376,11 @@ public class TableDiskUsageCache {
tableDiskUsageCache.writerMap.computeIfPresent(
regionId,
(k, writer) -> {
- writer.close();
if (writer.getActiveReaderNum() > 0) {
writer.setRemovedFuture(future);
return writer;
}
+ writer.close();
return null;
});
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
index d4ef7ad5a95..6c0e4ff72a3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
@@ -51,6 +51,7 @@ public class TsFileTableDiskUsageCacheWriter extends
AbstractTableSizeCacheWrite
public static final byte KEY_FILE_RECORD_TYPE_REDIRECT = 2;
private TsFileTableSizeIndexFileWriter tsFileTableSizeIndexFileWriter;
+ private long lastSyncTimestamp = System.currentTimeMillis();
public TsFileTableDiskUsageCacheWriter(String database, int regionId) {
super(database, regionId);
@@ -281,9 +282,17 @@ public class TsFileTableDiskUsageCacheWriter extends
AbstractTableSizeCacheWrite
return tsFileTableSizeIndexFileWriter.valueFileLength();
}
+ public void syncIfNecessary() throws IOException {
+ if (System.currentTimeMillis() - Math.max(lastWriteTimestamp,
lastSyncTimestamp)
+ >= TimeUnit.MINUTES.toMillis(10)) {
+ sync();
+ }
+ }
+
@Override
public void sync() throws IOException {
tsFileTableSizeIndexFileWriter.sync();
+ lastSyncTimestamp = System.currentTimeMillis();
}
@Override