This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch update_last_cache_in_load in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 85fdf8cb6523b3df167e3cb8ad220e8fe9653877 Author: Tian Jiang <[email protected]> AuthorDate: Thu May 29 10:35:47 2025 +0800 add cache last values for load --- .../it/env/cluster/config/MppDataNodeConfig.java | 6 + .../it/env/remote/config/RemoteDataNodeConfig.java | 5 + .../apache/iotdb/itbase/env/DataNodeConfig.java | 2 + .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java | 189 +++++++++++++-------- .../apache/iotdb/consensus/iot/IoTConsensus.java | 2 +- .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 5 + .../db/storageengine/dataregion/DataRegion.java | 41 +++-- .../db/storageengine/load/LoadTsFileManager.java | 4 +- 8 files changed, 171 insertions(+), 83 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java index 469881785e1..58ac4d596bc 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java @@ -113,4 +113,10 @@ public class MppDataNodeConfig extends MppBaseConfig implements DataNodeConfig { setProperty("last_cache_operation_on_load", strategyName); return this; } + + @Override + public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { + setProperty("cache_last_values_for_load", String.valueOf(cacheLastValuesForLoad)); + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java index f1f5146e060..63f50d15958 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java @@ -73,4 +73,9 @@ public class RemoteDataNodeConfig implements DataNodeConfig { public DataNodeConfig setLoadLastCacheStrategy(String strategyName) { return this; } + + @Override + public DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) { + return this; + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java index ce4993eeb32..353fdef7f25 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java @@ -43,4 +43,6 @@ public interface DataNodeConfig { DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter); DataNodeConfig setLoadLastCacheStrategy(String strategyName); + + DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad); } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java index 9b54a9ea724..bcfea6db44c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java @@ -106,7 +106,8 @@ public class IoTDBLoadLastCacheIT { EnvFactory.getEnv() .getConfig() .getDataNodeConfig() - .setLoadLastCacheStrategy(lastCacheLoadStrategy.name()); + .setLoadLastCacheStrategy(lastCacheLoadStrategy.name()) + .setCacheLastValuesForLoad(true); EnvFactory.getEnv().initClusterEnvironment(); } @@ -360,11 +361,12 @@ public class IoTDBLoadLastCacheIT { private final List<String> columnNames; private final List<TSDataType> dataTypes; - public PerformanceSchemas(String database, String tableName, int measurementNum) { + public PerformanceSchemas( + String database, String tableName, int measurementNum, int blobMeasurementNum) { this.database = database; - List<ColumnSchema> columnSchemas = new ArrayList<>(measurementNum); - columnNames = new ArrayList<>(measurementNum); - dataTypes = new ArrayList<>(measurementNum); + List<ColumnSchema> columnSchemas = new ArrayList<>(measurementNum + blobMeasurementNum); + columnNames = new ArrayList<>(measurementNum + blobMeasurementNum); + dataTypes = new ArrayList<>(measurementNum + blobMeasurementNum); columnSchemas.add(new ColumnSchema("device_id", TSDataType.STRING, ColumnCategory.TAG)); columnNames.add("device_id"); @@ -374,6 +376,12 @@ public class IoTDBLoadLastCacheIT { columnNames.add("s" + i); dataTypes.add(TSDataType.INT64); } + for (int i = 0; i < blobMeasurementNum; i++) { + columnSchemas.add( + new ColumnSchema("s" + (measurementNum + i), TSDataType.BLOB, ColumnCategory.FIELD)); + columnNames.add("s" + (measurementNum + i)); + dataTypes.add(TSDataType.BLOB); + } tableSchema = new TableSchema(tableName, columnSchemas); } } @@ -381,10 +389,12 @@ public class IoTDBLoadLastCacheIT { private void generateAndLoadOne( int deviceCnt, int measurementCnt, + int blobMeasurementCnt, int pointCnt, int offset, PerformanceSchemas schemas, - int fileNum) + int fileNum, + Statement statement) throws Exception { File file = new File("target" + File.separator + fileNum + ".tsfile"); try (ITsFileWriter tsFileWriter = @@ -398,50 +408,64 @@ public class IoTDBLoadLastCacheIT { for (int k = 0; k < measurementCnt; k++) { tablet.addValue(rowIndex, k + 1, (long) j + offset); } + for (int k = 0; k < blobMeasurementCnt; k++) { + tablet.addValue(rowIndex, k + 1 + measurementCnt, String.valueOf(j + offset)); + } rowIndex++; } } tsFileWriter.write(tablet); } - try (final Connection connection = EnvFactory.getEnv().getTableConnection(); - final Statement statement = connection.createStatement()) { - statement.execute("USE " + schemas.database); - statement.execute( - String.format( - "load '%s' with ('database-name'='%s')", file.getAbsolutePath(), schemas.database)); - } + statement.execute( + String.format( + "load '%s' with ('database-name'='%s')", file.getAbsolutePath(), schemas.database)); + file.delete(); } private void generateAndLoadAll( - int deviceCnt, int measurementCnt, int pointCnt, PerformanceSchemas schemas, int fileNum) + int deviceCnt, + int measurementCnt, + int blobMeasurementCnt, + int pointCnt, + PerformanceSchemas schemas, + int fileNum) throws Exception { - for (int i = 0; i < fileNum; i++) { - generateAndLoadOne(deviceCnt, measurementCnt, pointCnt, pointCnt * i, schemas, fileNum); - } - } - - private long queryLastOnce(int deviceNum, int measurementNum, PerformanceSchemas schemas) - throws SQLException { try (final Connection connection = EnvFactory.getEnv().getTableConnection(); final Statement statement = connection.createStatement()) { statement.execute("USE " + schemas.database); - try (final ResultSet resultSet = - statement.executeQuery( - String.format( - "select last(%s) from %s where device_id='%s'", - "s" + measurementNum, schemas.tableSchema.getTableName(), "d" + deviceNum))) { - if (resultSet.next()) { - return resultSet.getLong("_col0"); - } else { - return -1; - } - } catch (SQLException e) { - if (!e.getMessage().contains("does not exist")) { - throw e; - } + for (int i = 0; i < fileNum; i++) { + generateAndLoadOne( + deviceCnt, + measurementCnt, + blobMeasurementCnt, + pointCnt, + pointCnt * i, + schemas, + fileNum, + statement); + } + } + } + + private long queryLastOnce( + int deviceNum, int measurementNum, PerformanceSchemas schemas, Statement statement) + throws SQLException { + try (final ResultSet resultSet = + statement.executeQuery( + String.format( + "select last(time),last_by(%s, time) from %s where device_id='%s'", + "s" + measurementNum, schemas.tableSchema.getTableName(), "d" + deviceNum))) { + if (resultSet.next()) { + return resultSet.getLong("_col0"); + } else { + return -1; + } + } catch (SQLException e) { + if (!e.getMessage().contains("does not exist")) { + throw e; } } return -1; @@ -460,29 +484,36 @@ public class IoTDBLoadLastCacheIT { long totalStart = System.currentTimeMillis(); List<Long> timeConsumptions = new ArrayList<>(); - while (true) { - int deviceNum = random.nextInt(deviceCnt); - int measurementNum = random.nextInt(measurementCnt); - rateLimiter.acquire(); - long start = System.currentTimeMillis(); - long result = queryLastOnce(deviceNum, measurementNum, schemas); - long timeConsumption = System.currentTimeMillis() - start; - if (result == -1) { - try { - Thread.sleep(1000); - continue; - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); + try (final Connection connection = EnvFactory.getEnv().getTableConnection(); + final Statement statement = connection.createStatement()) { + statement.execute("USE " + schemas.database); + + while (true) { + int deviceNum = random.nextInt(deviceCnt); + int measurementNum = random.nextInt(measurementCnt); + rateLimiter.acquire(); + long start = System.currentTimeMillis(); + long result = queryLastOnce(deviceNum, measurementNum, schemas, statement); + long timeConsumption = System.currentTimeMillis() - start; + if (result == -1) { + try { + Thread.sleep(1000); + continue; + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + System.out.printf( + "%s: d%d.s%d %s %s%n", new Date(), deviceNum, measurementNum, result, timeConsumption); + timeConsumptions.add(timeConsumption); + if (result == (long) pointCnt * fileCnt - 1) { + break; } - } - System.out.printf("%s: %s %s%n", new Date(), result, timeConsumption); - timeConsumptions.add(timeConsumption); - if (result == (long) pointCnt * fileCnt - 1) { - break; } } + System.out.printf( - "Synchronization ends after %dms%n, query latency avg %f", + "Synchronization ends after %dms, query latency avg %fms %n", System.currentTimeMillis() - totalStart, timeConsumptions.stream().mapToLong(i -> i).average().orElse(0.0)); } @@ -490,13 +521,16 @@ public class IoTDBLoadLastCacheIT { // @Ignore("Performance") @Test public void testTableLoadPerformance() throws Exception { - int deviceCnt = 1000; + int deviceCnt = 100; int measurementCnt = 100; + int blobMeasurementCnt = 10; int pointCnt = 100; - int fileCnt = 100; - int queryPerSec = 10; + int fileCnt = 1000; + int queryPerSec = 1000; + int queryThreadsNum = 10; - PerformanceSchemas schemas = new PerformanceSchemas("test", "test_table", measurementCnt); + PerformanceSchemas schemas = + new PerformanceSchemas("test", "test_table", measurementCnt, blobMeasurementCnt); try (final Connection connection = EnvFactory.getEnv().getTableConnection(); final Statement statement = connection.createStatement()) { @@ -507,28 +541,41 @@ public class IoTDBLoadLastCacheIT { new Thread( () -> { try { - generateAndLoadAll(deviceCnt, measurementCnt, pointCnt, schemas, fileCnt); - } catch (Exception e) { + generateAndLoadAll( + deviceCnt, measurementCnt, blobMeasurementCnt, pointCnt, schemas, fileCnt); + } catch (Throwable e) { e.printStackTrace(); } }); RateLimiter rateLimiter = RateLimiter.create(queryPerSec); - Thread queryThread = - new Thread( - () -> { - try { - queryAll(deviceCnt, measurementCnt, pointCnt, fileCnt, schemas, rateLimiter); - } catch (SQLException e) { - e.printStackTrace(); - } - }); + List<Thread> queryThreads = new ArrayList<>(queryThreadsNum); + for (int i = 0; i < queryThreadsNum; i++) { + Thread queryThread = + new Thread( + () -> { + try { + queryAll( + deviceCnt, + measurementCnt + blobMeasurementCnt, + pointCnt, + fileCnt, + schemas, + rateLimiter); + } catch (Throwable e) { + e.printStackTrace(); + } + }); + queryThreads.add(queryThread); + } loadThread.start(); - queryThread.start(); + queryThreads.forEach(Thread::start); loadThread.join(); - queryThread.join(); + for (Thread queryThread : queryThreads) { + queryThread.join(); + } } private static class SchemaConfig { diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java index 959191ca2d6..d8d87d1ea03 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java @@ -237,7 +237,7 @@ public class IoTConsensus implements IConsensus { } else if (!impl.isActive()) { String message = String.format( - "Peer is inactive and not ready to write request, %s, DataNode Id: %s", + "Peer is inactive and not ready to write request, %s, DatsaNode Id: %s", groupId.toString(), impl.getThisNode().getNodeId()); return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, message); } else { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java index 47a080f5d7b..8651049d899 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java @@ -1151,6 +1151,11 @@ public class IoTDBConfig { private LastCacheLoadStrategy lastCacheLoadStrategy = LastCacheLoadStrategy.UPDATE; + /** + * Whether to cache last values when constructing TsFileResource during LOAD. When set to true, + * blob series will be forcibly ignored even if lastCacheLoadStrategy = + * LastCacheLoadStrategy.UPDATE. + */ private boolean cacheLastValuesForLoad = true; IoTDBConfig() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index 1947eb9ba9b..8bd1ab84c84 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -70,6 +70,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode; import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache; import org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager; import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache; @@ -3007,6 +3008,18 @@ public class DataRegion implements IDataRegionForQuery { "tsfile validate failed, " + newTsFileResource.getTsFile().getName()); } + TsFileLastReader lastReader = null; + if ((config.getLastCacheLoadStrategy() == LastCacheLoadStrategy.UPDATE + || config.getLastCacheLoadStrategy() == LastCacheLoadStrategy.UPDATE_NO_BLOB) + && !config.isCacheLastValuesForLoad()) { + try { + // init reader outside of lock to boost performance + lastReader = new TsFileLastReader(newTsFileResource.getTsFilePath()); + } catch (IOException e) { + throw new LoadFileException(e); + } + } + writeLock("loadNewTsFile"); try { if (deleted) { @@ -3069,7 +3082,7 @@ public class DataRegion implements IDataRegionForQuery { false); } - onTsFileLoaded(newTsFileResource, isFromConsensus); + onTsFileLoaded(newTsFileResource, isFromConsensus, lastReader); logger.info("TsFile {} is successfully loaded in unsequence list.", newFileName); } catch (final DiskSpaceInsufficientException e) { logger.error( @@ -3081,19 +3094,26 @@ public class DataRegion implements IDataRegionForQuery { throw new LoadFileException(e); } finally { writeUnlock(); + if (lastReader != null) { + try { + lastReader.close(); + } catch (Exception e) { + logger.warn("Cannot close last reader after loading TsFile {}", newTsFileResource, e); + } + } // TODO: do more precise control } } - private void onTsFileLoaded(TsFileResource newTsFileResource, boolean isFromConsensus) - throws Exception { + private void onTsFileLoaded( + TsFileResource newTsFileResource, boolean isFromConsensus, TsFileLastReader lastReader) { if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && !isFromConsensus) { - switch (IoTDBDescriptor.getInstance().getConfig().getLastCacheLoadStrategy()) { + switch (config.getLastCacheLoadStrategy()) { case UPDATE_NO_BLOB: - updateLastCache(newTsFileResource, true); + updateLastCache(newTsFileResource, true, lastReader); break; case UPDATE: - updateLastCache(newTsFileResource, false); + updateLastCache(newTsFileResource, false, lastReader); break; case CLEAN_ALL: // The inner cache is shared by TreeDeviceSchemaCacheManager and @@ -3127,8 +3147,8 @@ public class DataRegion implements IDataRegionForQuery { } @SuppressWarnings("java:S112") - private void updateLastCache(TsFileResource newTsFileResource, boolean ignoreBlob) - throws Exception { + private void updateLastCache( + TsFileResource newTsFileResource, boolean ignoreBlob, TsFileLastReader lastReader) { boolean isTableModel = isTableModelDatabase(databaseName); Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues = @@ -3153,8 +3173,7 @@ public class DataRegion implements IDataRegionForQuery { return; } - try (TsFileLastReader lastReader = - new TsFileLastReader(newTsFileResource.getTsFilePath(), true, ignoreBlob)) { + if (lastReader != null) { while (lastReader.hasNext()) { Pair<IDeviceID, List<Pair<String, TimeValuePair>>> nextDevice = lastReader.next(); IDeviceID deviceID = nextDevice.left; @@ -3171,6 +3190,8 @@ public class DataRegion implements IDataRegionForQuery { databaseName, deviceID, measurements, timeValuePairs, false, null); } } + } else { + TreeDeviceSchemaCacheManager.getInstance().cleanUp(); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java index b483266a544..76e315b7050 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java @@ -566,7 +566,9 @@ public class LoadTsFileManager { : chunkMetadata.getDataType(), chunkMetadata.getStatistics().getLastValue()) : null; - return new TimeValuePair(chunkMetadata.getEndTime(), lastValue); + return lastValue != null + ? new TimeValuePair(chunkMetadata.getEndTime(), lastValue) + : null; }); } }
