This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/table_disk_usage_statistics_with_cache by this push:
new 51336a36b43 close
51336a36b43 is described below
commit 51336a36b437c059cee2fa3c49dc89cc85386ff4
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jan 27 10:08:50 2026 +0800
close
---
.../iotdb/db/storageengine/StorageEngine.java | 2 ++
.../tableDiskUsageCache/TableDiskUsageCache.java | 39 +++++++++++++++++-----
2 files changed, 33 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index 964f6edcd76..66d1d6d0d79 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -71,6 +71,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.flush.CompressionRatio;
import org.apache.iotdb.db.storageengine.dataregion.flush.FlushListener;
import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
import
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache;
import org.apache.iotdb.db.storageengine.dataregion.wal.WALManager;
import org.apache.iotdb.db.storageengine.dataregion.wal.exception.WALException;
import
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
@@ -419,6 +420,7 @@ public class StorageEngine implements IService {
if (cachedThreadPool != null) {
cachedThreadPool.shutdownNow();
}
+ TableDiskUsageCache.getInstance().close();
dataRegionMap.clear();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index 30d9fec327b..f7ee6057c86 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -21,6 +21,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
+import org.apache.iotdb.commons.utils.PathUtils;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
@@ -44,12 +45,13 @@ import java.util.concurrent.TimeUnit;
public class TableDiskUsageCache {
protected static final Logger LOGGER =
LoggerFactory.getLogger(TableDiskUsageCache.class);
- protected final BlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
+ protected final BlockingQueue<Operation> queue = new
LinkedBlockingQueue<>(1000);
// regionId -> writer mapping
protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new
HashMap<>();
protected final ScheduledExecutorService scheduledExecutorService;
private int processedOperationCountSinceLastPeriodicCheck = 0;
protected volatile boolean failedToRecover = false;
+ private volatile boolean stop = false;
protected TableDiskUsageCache() {
scheduledExecutorService =
@@ -60,7 +62,7 @@ public class TableDiskUsageCache {
protected void run() {
try {
- while (!Thread.currentThread().isInterrupted()) {
+ while (!stop) {
try {
for (DataRegionTableSizeCacheWriter writer : writerMap.values()) {
syncTsFileTableSizeCacheIfNecessary(writer);
@@ -75,6 +77,7 @@ public class TableDiskUsageCache {
performPeriodicMaintenance();
}
} catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
return;
} catch (Exception e) {
LOGGER.error("Meet exception when apply TableDiskUsageCache
operation.", e);
@@ -163,6 +166,9 @@ public class TableDiskUsageCache {
public void registerRegion(DataRegion region) {
RegisterRegionOperation operation = new RegisterRegionOperation(region);
+ if (!PathUtils.isTableModelDatabase(region.getDatabaseName())) {
+ return;
+ }
addOperationToQueue(operation);
}
@@ -173,22 +179,38 @@ public class TableDiskUsageCache {
operation.future.get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
- } catch (Exception ignored) {
+ } catch (Exception e) {
+ LOGGER.error("Meet exception when remove TableDiskUsageCache.", e);
}
}
protected void addOperationToQueue(Operation operation) {
- if (failedToRecover) {
+ if (failedToRecover || stop) {
return;
}
- queue.add(operation);
+ try {
+ queue.put(operation);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ public int getQueueSize() {
+ return queue.size();
}
public void close() {
- if (scheduledExecutorService != null) {
- scheduledExecutorService.shutdownNow();
+ if (scheduledExecutorService == null) {
+ return;
+ }
+ try {
+ stop = true;
+ scheduledExecutorService.shutdown();
+ scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
+ writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
}
- writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
}
protected DataRegionTableSizeCacheWriter createWriter(
@@ -385,6 +407,7 @@ public class TableDiskUsageCache {
return writer;
}
writer.close();
+ future.complete(null);
return null;
});
}