This is an automated email from the ASF dual-hosted git repository.
shuwenwei pushed a commit to branch table_disk_usage_statistics_with_cache
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to
refs/heads/table_disk_usage_statistics_with_cache by this push:
new 83482fa7381 add ut
83482fa7381 is described below
commit 83482fa738115a645116017b5232e70abbd3dc4a
Author: shuwenwei <[email protected]>
AuthorDate: Wed Jan 28 15:36:27 2026 +0800
add ut
---
.../InformationSchemaContentSupplierFactory.java | 3 +
.../tableDiskUsageCache/TableDiskUsageCache.java | 34 ++-
.../TableDiskUsageCacheReader.java | 2 -
.../tsfile/TsFileTableDiskUsageCacheWriter.java | 1 -
.../dataregion/utils/TableDiskUsageTest.java | 234 +++++++++++++++++++++
5 files changed, 266 insertions(+), 8 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
index 9349bc9dfdb..5b486713e8f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/InformationSchemaContentSupplierFactory.java
@@ -1214,6 +1214,9 @@ public class InformationSchemaContentSupplierFactory {
AuthorityChecker.getAccessControl().checkUserGlobalSysPrivilege(userEntity);
try (final ConfigNodeClient client =
ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID))
{
+ // It is better to use an async ConfigNode client here.
+ // Using a synchronous client may block the calling thread when the
ConfigNode response is
+ // slow or temporarily unavailable, which can cause the operator to
exceed its maxRunTime
this.databaseTableInfoMap =
client.showTables4InformationSchema().getDatabaseTableInfoMap();
}
this.dataRegionIterator =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
index f7ee6057c86..9e8795b1594 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCache.java
@@ -22,6 +22,7 @@ package
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.commons.utils.PathUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileID;
import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.object.EmptyObjectTableSizeCacheReader;
@@ -48,7 +49,7 @@ public class TableDiskUsageCache {
protected final BlockingQueue<Operation> queue = new
LinkedBlockingQueue<>(1000);
// regionId -> writer mapping
protected final Map<Integer, DataRegionTableSizeCacheWriter> writerMap = new
HashMap<>();
- protected final ScheduledExecutorService scheduledExecutorService;
+ protected ScheduledExecutorService scheduledExecutorService;
private int processedOperationCountSinceLastPeriodicCheck = 0;
protected volatile boolean failedToRecover = false;
private volatile boolean stop = false;
@@ -155,7 +156,13 @@ public class TableDiskUsageCache {
DataRegion dataRegion, boolean readTsFileCache, boolean
readObjectFileCache) {
StartReadOperation operation =
new StartReadOperation(dataRegion, readTsFileCache,
readObjectFileCache);
- addOperationToQueue(operation);
+ if (!addOperationToQueue(operation)) {
+ operation.future.complete(
+ new Pair<>(
+ new TsFileTableSizeCacheReader(
+ 0, null, 0, null, dataRegion.getDataRegionId().getId()),
+ new EmptyObjectTableSizeCacheReader()));
+ }
return operation.future;
}
@@ -174,7 +181,9 @@ public class TableDiskUsageCache {
public void remove(String database, int regionId) {
RemoveRegionOperation operation = new RemoveRegionOperation(database,
regionId);
- addOperationToQueue(operation);
+ if (!addOperationToQueue(operation)) {
+ return;
+ }
try {
operation.future.get(5, TimeUnit.SECONDS);
} catch (InterruptedException e) {
@@ -184,15 +193,17 @@ public class TableDiskUsageCache {
}
}
- protected void addOperationToQueue(Operation operation) {
+ protected boolean addOperationToQueue(Operation operation) {
if (failedToRecover || stop) {
- return;
+ return false;
}
try {
queue.put(operation);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
+ return false;
}
+ return true;
}
public int getQueueSize() {
@@ -208,11 +219,24 @@ public class TableDiskUsageCache {
scheduledExecutorService.shutdown();
scheduledExecutorService.awaitTermination(5, TimeUnit.SECONDS);
writerMap.values().forEach(DataRegionTableSizeCacheWriter::close);
+ writerMap.clear();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
+ @TestOnly
+ public void ensureRunning() {
+ stop = false;
+ failedToRecover = false;
+ if (scheduledExecutorService.isTerminated()) {
+ scheduledExecutorService =
+ IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(
+ ThreadName.FILE_TIME_INDEX_RECORD.getName());
+ scheduledExecutorService.submit(this::run);
+ }
+ }
+
protected DataRegionTableSizeCacheWriter createWriter(
String database, int regionId, DataRegion region) {
return new DataRegionTableSizeCacheWriter(database, regionId, region);
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
index c72bdc72e3c..6d26856824f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/TableDiskUsageCacheReader.java
@@ -39,7 +39,6 @@ import java.util.concurrent.CompletableFuture;
public class TableDiskUsageCacheReader implements Closeable {
private final DataRegion dataRegion;
- private final int regionId;
private final DataRegionTableSizeQueryContext dataRegionContext;
private CompletableFuture<Pair<TsFileTableSizeCacheReader,
IObjectTableSizeCacheReader>>
@@ -64,7 +63,6 @@ public class TableDiskUsageCacheReader implements Closeable {
DataRegionTableSizeQueryContext dataRegionContext,
boolean databaseHasOnlyOneTable) {
this.dataRegion = dataRegion;
- this.regionId = Integer.parseInt(dataRegion.getDataRegionIdString());
this.dataRegionContext = dataRegionContext;
this.currentDatabaseOnlyHasOneTable = databaseHasOnlyOneTable;
this.timePartitionIterator =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
index 2ecd3d39662..7c60d38fc13 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/tableDiskUsageCache/tsfile/TsFileTableDiskUsageCacheWriter.java
@@ -130,7 +130,6 @@ public class TsFileTableDiskUsageCacheWriter extends
AbstractTableSizeCacheWrite
}
private int getVersion(String fileName) throws NumberFormatException {
- int version = 0;
String removePrefixStr =
fileName.substring(TSFILE_CACHE_KEY_FILENAME_PREFIX.length());
int suffixIdx = removePrefixStr.indexOf('.');
return Integer.parseInt(
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageTest.java
new file mode 100644
index 00000000000..d8ddc173dfe
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/utils/TableDiskUsageTest.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.dataregion.utils;
+
+import org.apache.iotdb.commons.consensus.DataRegionId;
+import org.apache.iotdb.commons.exception.MetadataException;
+import org.apache.iotdb.db.exception.StorageEngineException;
+import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.AbstractCompactionTest;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.tablemodel.CompactionTableModelTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileManager;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.DataRegionTableSizeQueryContext;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCache;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TableDiskUsageCacheReader;
+import
org.apache.iotdb.db.storageengine.dataregion.utils.tableDiskUsageCache.TimePartitionTableSizeQueryContext;
+
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class TableDiskUsageTest extends AbstractCompactionTest {
+
+ private DataRegion mockDataRegion;
+ private TsFileManager mockTsFileManager;
+
+ @Before
+ public void setUp()
+ throws IOException, WriteProcessException, MetadataException,
InterruptedException {
+ super.setUp();
+ TableDiskUsageCache.getInstance().ensureRunning();
+ mockDataRegion = Mockito.mock(DataRegion.class);
+ Mockito.when(mockDataRegion.getDatabaseName()).thenReturn("test");
+ Mockito.when(mockDataRegion.getDataRegionId()).thenReturn(new
DataRegionId(0));
+ Mockito.when(mockDataRegion.getDataRegionIdString()).thenReturn("0");
+ StorageEngine.getInstance().setDataRegion(new DataRegionId(0),
mockDataRegion);
+ mockTsFileManager = new TsFileManager("test", "0", "");
+
Mockito.when(mockDataRegion.getTsFileManager()).thenReturn(mockTsFileManager);
+ TableDiskUsageCache.getInstance().registerRegion(mockDataRegion);
+ }
+
+ @After
+ public void tearDown() throws IOException, StorageEngineException {
+ super.tearDown();
+ StorageEngine.getInstance().deleteDataRegion(new DataRegionId(0));
+ }
+
+ @Test
+ public void test1() throws Exception {
+ TsFileResource resource = prepareFile(4);
+ mockTsFileManager.add(resource, true);
+
+ DataRegionTableSizeQueryContext context = new
DataRegionTableSizeQueryContext(false);
+ // only query table1 and table2
+ Map<String, Long> timePartitionTableSizeMap = new HashMap<>();
+ timePartitionTableSizeMap.put("table1", 0L);
+ timePartitionTableSizeMap.put("table2", 0L);
+ context.addTimePartition(0, new
TimePartitionTableSizeQueryContext(timePartitionTableSizeMap));
+ TableDiskUsageCacheReader reader =
+ new TableDiskUsageCacheReader(mockDataRegion, context, false);
+ try {
+ Assert.assertTrue(reader.prepareCacheReader(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.loadObjectFileTableSizeCache(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.prepareCachedTsFileIDKeys(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.checkAllFilesInTsFileManager(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+
reader.readCacheValueFilesAndUpdateResultMap(System.currentTimeMillis(),
Long.MAX_VALUE));
+ } finally {
+ reader.close();
+ }
+ for (Map.Entry<Long, TimePartitionTableSizeQueryContext>
timePartitionEntry :
+ context.getTimePartitionTableSizeQueryContextMap().entrySet()) {
+ TimePartitionTableSizeQueryContext timePartitionContext =
timePartitionEntry.getValue();
+ for (Map.Entry<String, Long> entry :
+ timePartitionContext.getTableSizeResultMap().entrySet()) {
+ String tableName = entry.getKey();
+ long size = entry.getValue();
+ Assert.assertNotEquals("table3", tableName);
+ Assert.assertNotEquals("table4", tableName);
+ Assert.assertTrue(size > 0);
+ }
+ }
+ }
+
+ @Test
+ public void test2() throws Exception {
+ // cached
+ TsFileResource resource1 = prepareFile(4);
+ mockTsFileManager.add(resource1, true);
+ Map<String, Long> tableSizeMap = new HashMap<>();
+ tableSizeMap.put("table1", 10000000L);
+ tableSizeMap.put("table2", 10000000L);
+ TableDiskUsageCache.getInstance()
+ .write(mockDataRegion.getDatabaseName(), resource1.getTsFileID(),
tableSizeMap);
+
+ TsFileResource resource2 = prepareFile(4);
+ mockTsFileManager.add(resource2, true);
+
+ DataRegionTableSizeQueryContext context = new
DataRegionTableSizeQueryContext(false);
+ // only query table1 and table2
+ Map<String, Long> timePartitionTableSizeMap = new HashMap<>();
+ timePartitionTableSizeMap.put("table1", 0L);
+ timePartitionTableSizeMap.put("table2", 0L);
+ context.addTimePartition(0, new
TimePartitionTableSizeQueryContext(timePartitionTableSizeMap));
+ TableDiskUsageCacheReader reader =
+ new TableDiskUsageCacheReader(mockDataRegion, context, false);
+ try {
+ Assert.assertTrue(reader.prepareCacheReader(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.loadObjectFileTableSizeCache(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.prepareCachedTsFileIDKeys(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.checkAllFilesInTsFileManager(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+
reader.readCacheValueFilesAndUpdateResultMap(System.currentTimeMillis(),
Long.MAX_VALUE));
+ } finally {
+ reader.close();
+ }
+ for (Map.Entry<Long, TimePartitionTableSizeQueryContext>
timePartitionEntry :
+ context.getTimePartitionTableSizeQueryContextMap().entrySet()) {
+ TimePartitionTableSizeQueryContext timePartitionContext =
timePartitionEntry.getValue();
+ for (Map.Entry<String, Long> entry :
+ timePartitionContext.getTableSizeResultMap().entrySet()) {
+ String tableName = entry.getKey();
+ long size = entry.getValue();
+ Assert.assertNotEquals("table3", tableName);
+ Assert.assertNotEquals("table4", tableName);
+ Assert.assertTrue(size > 10000000L);
+ }
+ }
+ }
+
+ @Test
+ public void test3() throws Exception {
+ // deleted
+ TsFileResource resource1 = prepareFile(4);
+ Map<String, Long> tableSizeMap = new HashMap<>();
+ tableSizeMap.put("table1", 10000000L);
+ tableSizeMap.put("table2", 10000000L);
+ TableDiskUsageCache.getInstance()
+ .write(mockDataRegion.getDatabaseName(), resource1.getTsFileID(),
tableSizeMap);
+
+ TsFileResource resource2 = prepareFile(4);
+ mockTsFileManager.add(resource2, true);
+
+ DataRegionTableSizeQueryContext context = new
DataRegionTableSizeQueryContext(false);
+ // only query table1 and table2
+ Map<String, Long> timePartitionTableSizeMap = new HashMap<>();
+ timePartitionTableSizeMap.put("table1", 0L);
+ timePartitionTableSizeMap.put("table2", 0L);
+ context.addTimePartition(0, new
TimePartitionTableSizeQueryContext(timePartitionTableSizeMap));
+ TableDiskUsageCacheReader reader =
+ new TableDiskUsageCacheReader(mockDataRegion, context, false);
+ try {
+ Assert.assertTrue(reader.prepareCacheReader(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.loadObjectFileTableSizeCache(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.prepareCachedTsFileIDKeys(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+ reader.checkAllFilesInTsFileManager(System.currentTimeMillis(),
Long.MAX_VALUE));
+ Assert.assertTrue(
+
reader.readCacheValueFilesAndUpdateResultMap(System.currentTimeMillis(),
Long.MAX_VALUE));
+ } finally {
+ reader.close();
+ }
+ for (Map.Entry<Long, TimePartitionTableSizeQueryContext>
timePartitionEntry :
+ context.getTimePartitionTableSizeQueryContextMap().entrySet()) {
+ TimePartitionTableSizeQueryContext timePartitionContext =
timePartitionEntry.getValue();
+ for (Map.Entry<String, Long> entry :
+ timePartitionContext.getTableSizeResultMap().entrySet()) {
+ String tableName = entry.getKey();
+ long size = entry.getValue();
+ Assert.assertNotEquals("table3", tableName);
+ Assert.assertNotEquals("table4", tableName);
+ Assert.assertTrue(size < 10000000L && size > 0);
+ }
+ }
+ }
+
+ private TsFileResource prepareFile(int tableNum) throws IOException {
+ TsFileResource resource1 = createEmptyFileAndResource(true);
+ try (CompactionTableModelTestFileWriter writer =
+ new CompactionTableModelTestFileWriter(resource1)) {
+ for (int i = 0; i < tableNum; i++) {
+ writer.registerTableSchema("table" + i,
Collections.singletonList("device"));
+ writer.startChunkGroup("table" + i, Collections.singletonList("d1"));
+ writer.generateSimpleAlignedSeriesToCurrentDeviceWithNullValue(
+ Arrays.asList("s0", "s1"),
+ new TimeRange[][][] {new TimeRange[][] {new TimeRange[] {new
TimeRange(10, 12)}}},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false));
+ writer.endChunkGroup();
+ }
+ writer.endFile();
+ }
+ return resource1;
+ }
+}