This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch update_last_cache_in_load
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 85fdf8cb6523b3df167e3cb8ad220e8fe9653877
Author: Tian Jiang <[email protected]>
AuthorDate: Thu May 29 10:35:47 2025 +0800

    add cache last values for load
---
 .../it/env/cluster/config/MppDataNodeConfig.java   |   6 +
 .../it/env/remote/config/RemoteDataNodeConfig.java |   5 +
 .../apache/iotdb/itbase/env/DataNodeConfig.java    |   2 +
 .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java   | 189 +++++++++++++--------
 .../apache/iotdb/consensus/iot/IoTConsensus.java   |   2 +-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |   5 +
 .../db/storageengine/dataregion/DataRegion.java    |  41 +++--
 .../db/storageengine/load/LoadTsFileManager.java   |   4 +-
 8 files changed, 171 insertions(+), 83 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
index 469881785e1..58ac4d596bc 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/config/MppDataNodeConfig.java
@@ -113,4 +113,10 @@ public class MppDataNodeConfig extends MppBaseConfig 
implements DataNodeConfig {
     setProperty("last_cache_operation_on_load", strategyName);
     return this;
   }
+
+  @Override
+  public DataNodeConfig setCacheLastValuesForLoad(boolean 
cacheLastValuesForLoad) {
+    setProperty("cache_last_values_for_load", 
String.valueOf(cacheLastValuesForLoad));
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
index f1f5146e060..63f50d15958 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/config/RemoteDataNodeConfig.java
@@ -73,4 +73,9 @@ public class RemoteDataNodeConfig implements DataNodeConfig {
   public DataNodeConfig setLoadLastCacheStrategy(String strategyName) {
     return this;
   }
+
+  @Override
+  public DataNodeConfig setCacheLastValuesForLoad(boolean 
cacheLastValuesForLoad) {
+    return this;
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
index ce4993eeb32..353fdef7f25 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/itbase/env/DataNodeConfig.java
@@ -43,4 +43,6 @@ public interface DataNodeConfig {
   DataNodeConfig setMqttPayloadFormatter(String mqttPayloadFormatter);
 
   DataNodeConfig setLoadLastCacheStrategy(String strategyName);
+
+  DataNodeConfig setCacheLastValuesForLoad(boolean cacheLastValuesForLoad);
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
index 9b54a9ea724..bcfea6db44c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
@@ -106,7 +106,8 @@ public class IoTDBLoadLastCacheIT {
     EnvFactory.getEnv()
         .getConfig()
         .getDataNodeConfig()
-        .setLoadLastCacheStrategy(lastCacheLoadStrategy.name());
+        .setLoadLastCacheStrategy(lastCacheLoadStrategy.name())
+        .setCacheLastValuesForLoad(true);
     EnvFactory.getEnv().initClusterEnvironment();
   }
 
@@ -360,11 +361,12 @@ public class IoTDBLoadLastCacheIT {
     private final List<String> columnNames;
     private final List<TSDataType> dataTypes;
 
-    public PerformanceSchemas(String database, String tableName, int 
measurementNum) {
+    public PerformanceSchemas(
+        String database, String tableName, int measurementNum, int 
blobMeasurementNum) {
       this.database = database;
-      List<ColumnSchema> columnSchemas = new ArrayList<>(measurementNum);
-      columnNames = new ArrayList<>(measurementNum);
-      dataTypes = new ArrayList<>(measurementNum);
+      List<ColumnSchema> columnSchemas = new ArrayList<>(measurementNum + 
blobMeasurementNum);
+      columnNames = new ArrayList<>(measurementNum + blobMeasurementNum);
+      dataTypes = new ArrayList<>(measurementNum + blobMeasurementNum);
 
       columnSchemas.add(new ColumnSchema("device_id", TSDataType.STRING, 
ColumnCategory.TAG));
       columnNames.add("device_id");
@@ -374,6 +376,12 @@ public class IoTDBLoadLastCacheIT {
         columnNames.add("s" + i);
         dataTypes.add(TSDataType.INT64);
       }
+      for (int i = 0; i < blobMeasurementNum; i++) {
+        columnSchemas.add(
+            new ColumnSchema("s" + (measurementNum + i), TSDataType.BLOB, 
ColumnCategory.FIELD));
+        columnNames.add("s" + (measurementNum + i));
+        dataTypes.add(TSDataType.BLOB);
+      }
       tableSchema = new TableSchema(tableName, columnSchemas);
     }
   }
@@ -381,10 +389,12 @@ public class IoTDBLoadLastCacheIT {
   private void generateAndLoadOne(
       int deviceCnt,
       int measurementCnt,
+      int blobMeasurementCnt,
       int pointCnt,
       int offset,
       PerformanceSchemas schemas,
-      int fileNum)
+      int fileNum,
+      Statement statement)
       throws Exception {
     File file = new File("target" + File.separator + fileNum + ".tsfile");
     try (ITsFileWriter tsFileWriter =
@@ -398,50 +408,64 @@ public class IoTDBLoadLastCacheIT {
           for (int k = 0; k < measurementCnt; k++) {
             tablet.addValue(rowIndex, k + 1, (long) j + offset);
           }
+          for (int k = 0; k < blobMeasurementCnt; k++) {
+            tablet.addValue(rowIndex, k + 1 + measurementCnt, String.valueOf(j 
+ offset));
+          }
           rowIndex++;
         }
       }
       tsFileWriter.write(tablet);
     }
 
-    try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
-        final Statement statement = connection.createStatement()) {
-      statement.execute("USE " + schemas.database);
-      statement.execute(
-          String.format(
-              "load '%s' with ('database-name'='%s')", file.getAbsolutePath(), 
schemas.database));
-    }
+    statement.execute(
+        String.format(
+            "load '%s' with ('database-name'='%s')", file.getAbsolutePath(), 
schemas.database));
+
     file.delete();
   }
 
   private void generateAndLoadAll(
-      int deviceCnt, int measurementCnt, int pointCnt, PerformanceSchemas 
schemas, int fileNum)
+      int deviceCnt,
+      int measurementCnt,
+      int blobMeasurementCnt,
+      int pointCnt,
+      PerformanceSchemas schemas,
+      int fileNum)
       throws Exception {
-    for (int i = 0; i < fileNum; i++) {
-      generateAndLoadOne(deviceCnt, measurementCnt, pointCnt, pointCnt * i, 
schemas, fileNum);
-    }
-  }
-
-  private long queryLastOnce(int deviceNum, int measurementNum, 
PerformanceSchemas schemas)
-      throws SQLException {
     try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
         final Statement statement = connection.createStatement()) {
       statement.execute("USE " + schemas.database);
 
-      try (final ResultSet resultSet =
-          statement.executeQuery(
-              String.format(
-                  "select last(%s) from %s where device_id='%s'",
-                  "s" + measurementNum, schemas.tableSchema.getTableName(), 
"d" + deviceNum))) {
-        if (resultSet.next()) {
-          return resultSet.getLong("_col0");
-        } else {
-          return -1;
-        }
-      } catch (SQLException e) {
-        if (!e.getMessage().contains("does not exist")) {
-          throw e;
-        }
+      for (int i = 0; i < fileNum; i++) {
+        generateAndLoadOne(
+            deviceCnt,
+            measurementCnt,
+            blobMeasurementCnt,
+            pointCnt,
+            pointCnt * i,
+            schemas,
+            fileNum,
+            statement);
+      }
+    }
+  }
+
+  private long queryLastOnce(
+      int deviceNum, int measurementNum, PerformanceSchemas schemas, Statement 
statement)
+      throws SQLException {
+    try (final ResultSet resultSet =
+        statement.executeQuery(
+            String.format(
+                "select last(time),last_by(%s, time) from %s where 
device_id='%s'",
+                "s" + measurementNum, schemas.tableSchema.getTableName(), "d" 
+ deviceNum))) {
+      if (resultSet.next()) {
+        return resultSet.getLong("_col0");
+      } else {
+        return -1;
+      }
+    } catch (SQLException e) {
+      if (!e.getMessage().contains("does not exist")) {
+        throw e;
       }
     }
     return -1;
@@ -460,29 +484,36 @@ public class IoTDBLoadLastCacheIT {
     long totalStart = System.currentTimeMillis();
     List<Long> timeConsumptions = new ArrayList<>();
 
-    while (true) {
-      int deviceNum = random.nextInt(deviceCnt);
-      int measurementNum = random.nextInt(measurementCnt);
-      rateLimiter.acquire();
-      long start = System.currentTimeMillis();
-      long result = queryLastOnce(deviceNum, measurementNum, schemas);
-      long timeConsumption = System.currentTimeMillis() - start;
-      if (result == -1) {
-        try {
-          Thread.sleep(1000);
-          continue;
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
+    try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("USE " + schemas.database);
+
+      while (true) {
+        int deviceNum = random.nextInt(deviceCnt);
+        int measurementNum = random.nextInt(measurementCnt);
+        rateLimiter.acquire();
+        long start = System.currentTimeMillis();
+        long result = queryLastOnce(deviceNum, measurementNum, schemas, 
statement);
+        long timeConsumption = System.currentTimeMillis() - start;
+        if (result == -1) {
+          try {
+            Thread.sleep(1000);
+            continue;
+          } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+          }
+        }
+        System.out.printf(
+            "%s: d%d.s%d %s %s%n", new Date(), deviceNum, measurementNum, 
result, timeConsumption);
+        timeConsumptions.add(timeConsumption);
+        if (result == (long) pointCnt * fileCnt - 1) {
+          break;
         }
-      }
-      System.out.printf("%s: %s %s%n", new Date(), result, timeConsumption);
-      timeConsumptions.add(timeConsumption);
-      if (result == (long) pointCnt * fileCnt - 1) {
-        break;
       }
     }
+
     System.out.printf(
-        "Synchronization ends after %dms%n, query latency avg %f",
+        "Synchronization ends after %dms, query latency avg %fms %n",
         System.currentTimeMillis() - totalStart,
         timeConsumptions.stream().mapToLong(i -> i).average().orElse(0.0));
   }
@@ -490,13 +521,16 @@ public class IoTDBLoadLastCacheIT {
   // @Ignore("Performance")
   @Test
   public void testTableLoadPerformance() throws Exception {
-    int deviceCnt = 1000;
+    int deviceCnt = 100;
     int measurementCnt = 100;
+    int blobMeasurementCnt = 10;
     int pointCnt = 100;
-    int fileCnt = 100;
-    int queryPerSec = 10;
+    int fileCnt = 1000;
+    int queryPerSec = 1000;
+    int queryThreadsNum = 10;
 
-    PerformanceSchemas schemas = new PerformanceSchemas("test", "test_table", 
measurementCnt);
+    PerformanceSchemas schemas =
+        new PerformanceSchemas("test", "test_table", measurementCnt, 
blobMeasurementCnt);
 
     try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
         final Statement statement = connection.createStatement()) {
@@ -507,28 +541,41 @@ public class IoTDBLoadLastCacheIT {
         new Thread(
             () -> {
               try {
-                generateAndLoadAll(deviceCnt, measurementCnt, pointCnt, 
schemas, fileCnt);
-              } catch (Exception e) {
+                generateAndLoadAll(
+                    deviceCnt, measurementCnt, blobMeasurementCnt, pointCnt, 
schemas, fileCnt);
+              } catch (Throwable e) {
                 e.printStackTrace();
               }
             });
 
     RateLimiter rateLimiter = RateLimiter.create(queryPerSec);
-    Thread queryThread =
-        new Thread(
-            () -> {
-              try {
-                queryAll(deviceCnt, measurementCnt, pointCnt, fileCnt, 
schemas, rateLimiter);
-              } catch (SQLException e) {
-                e.printStackTrace();
-              }
-            });
+    List<Thread> queryThreads = new ArrayList<>(queryThreadsNum);
+    for (int i = 0; i < queryThreadsNum; i++) {
+      Thread queryThread =
+          new Thread(
+              () -> {
+                try {
+                  queryAll(
+                      deviceCnt,
+                      measurementCnt + blobMeasurementCnt,
+                      pointCnt,
+                      fileCnt,
+                      schemas,
+                      rateLimiter);
+                } catch (Throwable e) {
+                  e.printStackTrace();
+                }
+              });
+      queryThreads.add(queryThread);
+    }
 
     loadThread.start();
-    queryThread.start();
+    queryThreads.forEach(Thread::start);
 
     loadThread.join();
-    queryThread.join();
+    for (Thread queryThread : queryThreads) {
+      queryThread.join();
+    }
   }
 
   private static class SchemaConfig {
diff --git 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
index 959191ca2d6..d8d87d1ea03 100644
--- 
a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
+++ 
b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/IoTConsensus.java
@@ -237,7 +237,7 @@ public class IoTConsensus implements IConsensus {
     } else if (!impl.isActive()) {
       String message =
           String.format(
-              "Peer is inactive and not ready to write request, %s, DataNode 
Id: %s",
+              "Peer is inactive and not ready to write request, %s, DatsaNode 
Id: %s",
               groupId.toString(), impl.getThisNode().getNodeId());
       return RpcUtils.getStatus(TSStatusCode.WRITE_PROCESS_REJECT, message);
     } else {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 47a080f5d7b..8651049d899 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1151,6 +1151,11 @@ public class IoTDBConfig {
 
   private LastCacheLoadStrategy lastCacheLoadStrategy = 
LastCacheLoadStrategy.UPDATE;
 
+  /**
+   * Whether to cache last values when constructing TsFileResource during 
LOAD. When set to true,
+   * blob series will be forcibly ignored even if lastCacheLoadStrategy =
+   * LastCacheLoadStrategy.UPDATE.
+   */
   private boolean cacheLastValuesForLoad = true;
 
   IoTDBConfig() {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 1947eb9ba9b..8bd1ab84c84 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -70,6 +70,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowsOf
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertTabletNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalDeleteDataNode;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableSchema;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.LastCacheLoadStrategy;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TableDeviceSchemaCache;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.fetcher.cache.TreeDeviceSchemaCacheManager;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
@@ -3007,6 +3008,18 @@ public class DataRegion implements IDataRegionForQuery {
           "tsfile validate failed, " + 
newTsFileResource.getTsFile().getName());
     }
 
+    TsFileLastReader lastReader = null;
+    if ((config.getLastCacheLoadStrategy() == LastCacheLoadStrategy.UPDATE
+            || config.getLastCacheLoadStrategy() == 
LastCacheLoadStrategy.UPDATE_NO_BLOB)
+        && !config.isCacheLastValuesForLoad()) {
+      try {
+        // init reader outside of lock to boost performance
+        lastReader = new TsFileLastReader(newTsFileResource.getTsFilePath());
+      } catch (IOException e) {
+        throw new LoadFileException(e);
+      }
+    }
+
     writeLock("loadNewTsFile");
     try {
       if (deleted) {
@@ -3069,7 +3082,7 @@ public class DataRegion implements IDataRegionForQuery {
                 false);
       }
 
-      onTsFileLoaded(newTsFileResource, isFromConsensus);
+      onTsFileLoaded(newTsFileResource, isFromConsensus, lastReader);
       logger.info("TsFile {} is successfully loaded in unsequence list.", 
newFileName);
     } catch (final DiskSpaceInsufficientException e) {
       logger.error(
@@ -3081,19 +3094,26 @@ public class DataRegion implements IDataRegionForQuery {
       throw new LoadFileException(e);
     } finally {
       writeUnlock();
+      if (lastReader != null) {
+        try {
+          lastReader.close();
+        } catch (Exception e) {
+          logger.warn("Cannot close last reader after loading TsFile {}", 
newTsFileResource, e);
+        }
+      }
       // TODO: do more precise control
     }
   }
 
-  private void onTsFileLoaded(TsFileResource newTsFileResource, boolean 
isFromConsensus)
-      throws Exception {
+  private void onTsFileLoaded(
+      TsFileResource newTsFileResource, boolean isFromConsensus, 
TsFileLastReader lastReader) {
     if (CommonDescriptor.getInstance().getConfig().isLastCacheEnable() && 
!isFromConsensus) {
-      switch 
(IoTDBDescriptor.getInstance().getConfig().getLastCacheLoadStrategy()) {
+      switch (config.getLastCacheLoadStrategy()) {
         case UPDATE_NO_BLOB:
-          updateLastCache(newTsFileResource, true);
+          updateLastCache(newTsFileResource, true, lastReader);
           break;
         case UPDATE:
-          updateLastCache(newTsFileResource, false);
+          updateLastCache(newTsFileResource, false, lastReader);
           break;
         case CLEAN_ALL:
           // The inner cache is shared by TreeDeviceSchemaCacheManager and
@@ -3127,8 +3147,8 @@ public class DataRegion implements IDataRegionForQuery {
   }
 
   @SuppressWarnings("java:S112")
-  private void updateLastCache(TsFileResource newTsFileResource, boolean 
ignoreBlob)
-      throws Exception {
+  private void updateLastCache(
+      TsFileResource newTsFileResource, boolean ignoreBlob, TsFileLastReader 
lastReader) {
     boolean isTableModel = isTableModelDatabase(databaseName);
 
     Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues =
@@ -3153,8 +3173,7 @@ public class DataRegion implements IDataRegionForQuery {
       return;
     }
 
-    try (TsFileLastReader lastReader =
-        new TsFileLastReader(newTsFileResource.getTsFilePath(), true, 
ignoreBlob)) {
+    if (lastReader != null) {
       while (lastReader.hasNext()) {
         Pair<IDeviceID, List<Pair<String, TimeValuePair>>> nextDevice = 
lastReader.next();
         IDeviceID deviceID = nextDevice.left;
@@ -3171,6 +3190,8 @@ public class DataRegion implements IDataRegionForQuery {
                   databaseName, deviceID, measurements, timeValuePairs, false, 
null);
         }
       }
+    } else {
+      TreeDeviceSchemaCacheManager.getInstance().cleanUp();
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index b483266a544..76e315b7050 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -566,7 +566,9 @@ public class LoadTsFileManager {
                                       : chunkMetadata.getDataType(),
                                   chunkMetadata.getStatistics().getLastValue())
                               : null;
-                      return new TimeValuePair(chunkMetadata.getEndTime(), 
lastValue);
+                      return lastValue != null
+                          ? new TimeValuePair(chunkMetadata.getEndTime(), 
lastValue)
+                          : null;
                     });
           }
         }

Reply via email to