This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch update_last_cache_in_load
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8f6f578c7853ca54cfc6b7857a9b9059206d6691
Author: Tian Jiang <[email protected]>
AuthorDate: Wed May 28 10:44:17 2025 +0800

    add cacheLastValuesForLoad
---
 .../apache/iotdb/db/it/IoTDBLoadLastCacheIT.java   | 185 ++++++++++++++++++++-
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java |  12 ++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |   5 +
 .../pipeconsensus/PipeConsensusReceiver.java       |  12 +-
 .../plan/analyze/load/LoadTsFileAnalyzer.java      |  10 +-
 .../db/storageengine/dataregion/DataRegion.java    |  22 +++
 .../dataregion/tsfile/TsFileResource.java          |  11 ++
 .../dataregion/utils/TsFileResourceUtils.java      |  45 ++++-
 .../file/AbstractTsFileRecoverPerformer.java       |   2 +-
 .../db/storageengine/load/LoadTsFileManager.java   |  46 +++++
 10 files changed, 338 insertions(+), 12 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
index f05683056f0..9b54a9ea724 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBLoadLastCacheIT.java
@@ -27,6 +27,7 @@ import org.apache.iotdb.itbase.category.ClusterIT;
 import org.apache.iotdb.itbase.category.LocalStandaloneIT;
 import org.apache.iotdb.jdbc.IoTDBSQLException;
 
+import com.google.common.util.concurrent.RateLimiter;
 import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.ColumnSchema;
@@ -55,13 +56,17 @@ import java.sql.Connection;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Date;
 import java.util.List;
 import java.util.Objects;
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
+@SuppressWarnings({"ResultOfMethodCallIgnored", "UnstableApiUsage"})
 @RunWith(Parameterized.class)
 @Category({LocalStandaloneIT.class, ClusterIT.class})
 public class IoTDBLoadLastCacheIT {
@@ -276,8 +281,6 @@ public class IoTDBLoadLastCacheIT {
 
   @Test
   public void testTableModelLoadWithLastCache() throws Exception {
-    registerSchema();
-
     final String database = SchemaConfig.DATABASE_0;
     final String table = SchemaConfig.TABLE_0;
     final String measurement = 
SchemaConfig.MEASUREMENT_00.getMeasurementName();
@@ -350,6 +353,184 @@ public class IoTDBLoadLastCacheIT {
     }
   }
 
+  private static class PerformanceSchemas {
+
+    private final String database;
+    private final TableSchema tableSchema;
+    private final List<String> columnNames;
+    private final List<TSDataType> dataTypes;
+
+    public PerformanceSchemas(String database, String tableName, int 
measurementNum) {
+      this.database = database;
+      List<ColumnSchema> columnSchemas = new ArrayList<>(measurementNum);
+      columnNames = new ArrayList<>(measurementNum);
+      dataTypes = new ArrayList<>(measurementNum);
+
+      columnSchemas.add(new ColumnSchema("device_id", TSDataType.STRING, 
ColumnCategory.TAG));
+      columnNames.add("device_id");
+      dataTypes.add(TSDataType.STRING);
+      for (int i = 0; i < measurementNum; i++) {
+        columnSchemas.add(new ColumnSchema("s" + i, TSDataType.INT64, 
ColumnCategory.FIELD));
+        columnNames.add("s" + i);
+        dataTypes.add(TSDataType.INT64);
+      }
+      tableSchema = new TableSchema(tableName, columnSchemas);
+    }
+  }
+
+  private void generateAndLoadOne(
+      int deviceCnt,
+      int measurementCnt,
+      int pointCnt,
+      int offset,
+      PerformanceSchemas schemas,
+      int fileNum)
+      throws Exception {
+    File file = new File("target" + File.separator + fileNum + ".tsfile");
+    try (ITsFileWriter tsFileWriter =
+        new 
TsFileWriterBuilder().file(file).tableSchema(schemas.tableSchema).build()) {
+      Tablet tablet = new Tablet(schemas.columnNames, schemas.dataTypes, 
pointCnt * deviceCnt);
+      int rowIndex = 0;
+      for (int i = 0; i < deviceCnt; i++) {
+        for (int j = 0; j < pointCnt; j++) {
+          tablet.addTimestamp(rowIndex, j + offset);
+          tablet.addValue(rowIndex, 0, "d" + i);
+          for (int k = 0; k < measurementCnt; k++) {
+            tablet.addValue(rowIndex, k + 1, (long) j + offset);
+          }
+          rowIndex++;
+        }
+      }
+      tsFileWriter.write(tablet);
+    }
+
+    try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("USE " + schemas.database);
+      statement.execute(
+          String.format(
+              "load '%s' with ('database-name'='%s')", file.getAbsolutePath(), 
schemas.database));
+    }
+    file.delete();
+  }
+
+  private void generateAndLoadAll(
+      int deviceCnt, int measurementCnt, int pointCnt, PerformanceSchemas 
schemas, int fileNum)
+      throws Exception {
+    for (int i = 0; i < fileNum; i++) {
+      generateAndLoadOne(deviceCnt, measurementCnt, pointCnt, pointCnt * i, 
schemas, fileNum);
+    }
+  }
+
+  private long queryLastOnce(int deviceNum, int measurementNum, 
PerformanceSchemas schemas)
+      throws SQLException {
+    try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("USE " + schemas.database);
+
+      try (final ResultSet resultSet =
+          statement.executeQuery(
+              String.format(
+                  "select last(%s) from %s where device_id='%s'",
+                  "s" + measurementNum, schemas.tableSchema.getTableName(), 
"d" + deviceNum))) {
+        if (resultSet.next()) {
+          return resultSet.getLong("_col0");
+        } else {
+          return -1;
+        }
+      } catch (SQLException e) {
+        if (!e.getMessage().contains("does not exist")) {
+          throw e;
+        }
+      }
+    }
+    return -1;
+  }
+
+  @SuppressWarnings("BusyWait")
+  private void queryAll(
+      int deviceCnt,
+      int measurementCnt,
+      int pointCnt,
+      int fileCnt,
+      PerformanceSchemas schemas,
+      RateLimiter rateLimiter)
+      throws SQLException {
+    Random random = new Random();
+    long totalStart = System.currentTimeMillis();
+    List<Long> timeConsumptions = new ArrayList<>();
+
+    while (true) {
+      int deviceNum = random.nextInt(deviceCnt);
+      int measurementNum = random.nextInt(measurementCnt);
+      rateLimiter.acquire();
+      long start = System.currentTimeMillis();
+      long result = queryLastOnce(deviceNum, measurementNum, schemas);
+      long timeConsumption = System.currentTimeMillis() - start;
+      if (result == -1) {
+        try {
+          Thread.sleep(1000);
+          continue;
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+      System.out.printf("%s: %s %s%n", new Date(), result, timeConsumption);
+      timeConsumptions.add(timeConsumption);
+      if (result == (long) pointCnt * fileCnt - 1) {
+        break;
+      }
+    }
+    System.out.printf(
+        "Synchronization ends after %dms%n, query latency avg %f",
+        System.currentTimeMillis() - totalStart,
+        timeConsumptions.stream().mapToLong(i -> i).average().orElse(0.0));
+  }
+
+  // @Ignore("Performance")
+  @Test
+  public void testTableLoadPerformance() throws Exception {
+    int deviceCnt = 1000;
+    int measurementCnt = 100;
+    int pointCnt = 100;
+    int fileCnt = 100;
+    int queryPerSec = 10;
+
+    PerformanceSchemas schemas = new PerformanceSchemas("test", "test_table", 
measurementCnt);
+
+    try (final Connection connection = 
EnvFactory.getEnv().getTableConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute("CREATE DATABASE IF NOT EXISTS " + schemas.database);
+    }
+
+    Thread loadThread =
+        new Thread(
+            () -> {
+              try {
+                generateAndLoadAll(deviceCnt, measurementCnt, pointCnt, 
schemas, fileCnt);
+              } catch (Exception e) {
+                e.printStackTrace();
+              }
+            });
+
+    RateLimiter rateLimiter = RateLimiter.create(queryPerSec);
+    Thread queryThread =
+        new Thread(
+            () -> {
+              try {
+                queryAll(deviceCnt, measurementCnt, pointCnt, fileCnt, 
schemas, rateLimiter);
+              } catch (SQLException e) {
+                e.printStackTrace();
+              }
+            });
+
+    loadThread.start();
+    queryThread.start();
+
+    loadThread.join();
+    queryThread.join();
+  }
+
   private static class SchemaConfig {
 
     private static final String DATABASE_0 = "db";
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index adc0b70cd31..47a080f5d7b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1151,6 +1151,8 @@ public class IoTDBConfig {
 
   private LastCacheLoadStrategy lastCacheLoadStrategy = 
LastCacheLoadStrategy.UPDATE;
 
+  private boolean cacheLastValuesForLoad = true;
+
   IoTDBConfig() {}
 
   public int getMaxLogEntriesNumPerBatch() {
@@ -4079,4 +4081,14 @@ public class IoTDBConfig {
   public void setLastCacheLoadStrategy(LastCacheLoadStrategy 
lastCacheLoadStrategy) {
     this.lastCacheLoadStrategy = lastCacheLoadStrategy;
   }
+
+  public boolean isCacheLastValuesForLoad() {
+    return (lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE
+            || lastCacheLoadStrategy == LastCacheLoadStrategy.UPDATE_NO_BLOB)
+        && cacheLastValuesForLoad;
+  }
+
+  public void setCacheLastValuesForLoad(boolean cacheLastValuesForLoad) {
+    this.cacheLastValuesForLoad = cacheLastValuesForLoad;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index c927c0f4cfb..e1a9d155c98 100755
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2267,6 +2267,11 @@ public class IoTDBDescriptor {
         LastCacheLoadStrategy.valueOf(
             properties.getProperty(
                 "last_cache_operation_on_load", 
LastCacheLoadStrategy.UPDATE.name())));
+
+    conf.setCacheLastValuesForLoad(
+        Boolean.parseBoolean(
+            properties.getProperty(
+                "cache_last_values_for_load", 
String.valueOf(conf.isCacheLastValuesForLoad()))));
   }
 
   private void loadLoadTsFileHotModifiedProp(TrimProperties properties) throws 
IOException {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
index 843cc1765c9..e13d2a08297 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/receiver/protocol/pipeconsensus/PipeConsensusReceiver.java
@@ -718,7 +718,11 @@ public class PipeConsensusReceiver {
     DataRegion region =
         StorageEngine.getInstance().getDataRegion(((DataRegionId) 
consensusGroupId));
     if (region != null) {
-      TsFileResource resource = generateTsFileResource(filePath, 
progressIndex);
+      TsFileResource resource =
+          generateTsFileResource(
+              filePath,
+              progressIndex,
+              
IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad());
       region.loadNewTsFile(resource, true, false, true);
     } else {
       // Data region is null indicates that dr has been removed or migrated. 
In those cases, there
@@ -773,13 +777,13 @@ public class PipeConsensusReceiver {
                                 dataRegion, databaseName, writePointCount, 
true)));
   }
 
-  private TsFileResource generateTsFileResource(String filePath, ProgressIndex 
progressIndex)
-      throws IOException {
+  private TsFileResource generateTsFileResource(
+      String filePath, ProgressIndex progressIndex, boolean cacheLastValues) 
throws IOException {
     final File tsFile = new File(filePath);
 
     final TsFileResource tsFileResource = new TsFileResource(tsFile);
     try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(tsFile.getAbsolutePath())) {
-      TsFileResourceUtils.updateTsFileResource(reader, tsFileResource);
+      TsFileResourceUtils.updateTsFileResource(reader, tsFileResource, 
cacheLastValues);
     }
 
     tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
index 4fccee49e12..8232a71d30d 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/load/LoadTsFileAnalyzer.java
@@ -519,7 +519,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
       // Update time index no matter if resource file exists or not, because 
resource file may be
       // untrusted
-      TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata, 
tsFileResource);
+      TsFileResourceUtils.updateTsFileResource(
+          device2TimeseriesMetadata,
+          tsFileResource,
+          
IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad());
       
getOrCreateTreeSchemaVerifier().setCurrentTimeIndex(tsFileResource.getTimeIndex());
 
       if (isAutoCreateSchemaOrVerifySchemaEnabled) {
@@ -578,7 +581,10 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
 
       // Update time index no matter if resource file exists or not, because 
resource file may be
       // untrusted
-      TsFileResourceUtils.updateTsFileResource(device2TimeseriesMetadata, 
tsFileResource);
+      TsFileResourceUtils.updateTsFileResource(
+          device2TimeseriesMetadata,
+          tsFileResource,
+          
IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad());
       
getOrCreateTableSchemaCache().setCurrentTimeIndex(tsFileResource.getTimeIndex());
 
       for (IDeviceID deviceId : device2TimeseriesMetadata.keySet()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index 2d749622ce6..1947eb9ba9b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -3131,6 +3131,28 @@ public class DataRegion implements IDataRegionForQuery {
       throws Exception {
     boolean isTableModel = isTableModelDatabase(databaseName);
 
+    Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues =
+        newTsFileResource.getLastValues();
+    if (lastValues != null) {
+      for (Entry<IDeviceID, List<Pair<String, TimeValuePair>>> entry : 
lastValues.entrySet()) {
+        IDeviceID deviceID = entry.getKey();
+        String[] measurements = 
entry.getValue().stream().map(Pair::getLeft).toArray(String[]::new);
+        TimeValuePair[] timeValuePairs =
+            
entry.getValue().stream().map(Pair::getRight).toArray(TimeValuePair[]::new);
+        if (isTableModel) {
+          TableDeviceSchemaCache.getInstance()
+              .updateLastCacheIfExists(databaseName, deviceID, measurements, 
timeValuePairs);
+        } else {
+          // we do not update schema here, so aligned is not relevant
+          TreeDeviceSchemaCacheManager.getInstance()
+              .updateLastCacheIfExists(
+                  databaseName, deviceID, measurements, timeValuePairs, false, 
null);
+        }
+      }
+      newTsFileResource.setLastValues(null);
+      return;
+    }
+
     try (TsFileLastReader lastReader =
         new TsFileLastReader(newTsFileResource.getTsFilePath(), true, 
ignoreBlob)) {
       while (lastReader.hasNext()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
index 2d2fbea1e74..1beb48ee246 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/tsfile/TsFileResource.java
@@ -56,6 +56,7 @@ import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.ITimeSeriesMetadata;
 import org.apache.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.tsfile.fileSystem.fsFactory.FSFactory;
+import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.FilePathUtils;
 import org.apache.tsfile.utils.Pair;
@@ -205,6 +206,8 @@ public class TsFileResource implements PersistentResource {
   private InsertionCompactionCandidateStatus 
insertionCompactionCandidateStatus =
       InsertionCompactionCandidateStatus.NOT_CHECKED;
 
+  private Map<IDeviceID, List<Pair<String, TimeValuePair>>> lastValues;
+
   @TestOnly
   public TsFileResource() {
     this.tsFileID = new TsFileID();
@@ -1565,4 +1568,12 @@ public class TsFileResource implements 
PersistentResource {
   public void setCompactionModFile(ModificationFile compactionModFile) {
     this.compactionModFile = compactionModFile;
   }
+
+  public Map<IDeviceID, List<Pair<String, TimeValuePair>>> getLastValues() {
+    return lastValues;
+  }
+
+  public void setLastValues(Map<IDeviceID, List<Pair<String, TimeValuePair>>> 
lastValues) {
+    this.lastValues = lastValues;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
index 6ff2b605cf4..16e7939069f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/utils/TsFileResourceUtils.java
@@ -42,12 +42,14 @@ import org.apache.tsfile.file.metadata.IChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.reader.page.PageReader;
 import org.apache.tsfile.read.reader.page.TimePageReader;
 import org.apache.tsfile.read.reader.page.ValuePageReader;
 import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +64,7 @@ import java.util.Map;
 import java.util.Set;
 
 public class TsFileResourceUtils {
+
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileResourceUtils.class);
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
   private static final String VALIDATE_FAILED = "validate failed,";
@@ -409,27 +412,63 @@ public class TsFileResourceUtils {
   }
 
   public static void updateTsFileResource(
-      TsFileSequenceReader reader, TsFileResource tsFileResource) throws 
IOException {
-    updateTsFileResource(reader.getAllTimeseriesMetadata(false), 
tsFileResource);
+      TsFileSequenceReader reader, TsFileResource tsFileResource, boolean 
cacheLastValues)
+      throws IOException {
+    updateTsFileResource(reader.getAllTimeseriesMetadata(false), 
tsFileResource, cacheLastValues);
     tsFileResource.updatePlanIndexes(reader.getMinPlanIndex());
     tsFileResource.updatePlanIndexes(reader.getMaxPlanIndex());
   }
 
   public static void updateTsFileResource(
-      Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata, TsFileResource 
tsFileResource) {
+      Map<IDeviceID, List<TimeseriesMetadata>> device2Metadata,
+      TsFileResource tsFileResource,
+      boolean cacheLastValue) {
     // For async recover tsfile, there might be a FileTimeIndex, we need a new 
newTimeIndex
     ITimeIndex newTimeIndex =
         tsFileResource.getTimeIndex().getTimeIndexType() == 
ITimeIndex.FILE_TIME_INDEX_TYPE
             ? config.getTimeIndexLevel().getTimeIndex()
             : tsFileResource.getTimeIndex();
+    Map<IDeviceID, List<Pair<String, TimeValuePair>>> deviceLastValues =
+        tsFileResource.getLastValues();
+    if (cacheLastValue && deviceLastValues == null) {
+      deviceLastValues = new HashMap<>(device2Metadata.size());
+    }
     for (Map.Entry<IDeviceID, List<TimeseriesMetadata>> entry : 
device2Metadata.entrySet()) {
+      List<Pair<String, TimeValuePair>> seriesLastValues = null;
+      if (cacheLastValue) {
+        seriesLastValues = new ArrayList<>(entry.getValue().size());
+      }
+
       for (TimeseriesMetadata timeseriesMetaData : entry.getValue()) {
         newTimeIndex.updateStartTime(
             entry.getKey(), timeseriesMetaData.getStatistics().getStartTime());
         newTimeIndex.updateEndTime(entry.getKey(), 
timeseriesMetaData.getStatistics().getEndTime());
+        if (cacheLastValue) {
+          if (timeseriesMetaData.getTsDataType() != TSDataType.BLOB) {
+            TsPrimitiveType value;
+            value =
+                TsPrimitiveType.getByType(
+                    timeseriesMetaData.getTsDataType() == TSDataType.VECTOR
+                        ? TSDataType.INT64
+                        : timeseriesMetaData.getTsDataType(),
+                    timeseriesMetaData.getStatistics().getLastValue());
+            seriesLastValues.add(
+                new Pair<>(
+                    timeseriesMetaData.getMeasurementId(),
+                    new 
TimeValuePair(timeseriesMetaData.getStatistics().getEndTime(), value)));
+          } else {
+            seriesLastValues.add(new 
Pair<>(timeseriesMetaData.getMeasurementId(), null));
+          }
+        }
+      }
+      if (cacheLastValue) {
+        deviceLastValues
+            .computeIfAbsent(entry.getKey(), deviceID -> new ArrayList<>())
+            .addAll(seriesLastValues);
       }
     }
     tsFileResource.setTimeIndex(newTimeIndex);
+    tsFileResource.setLastValues(deviceLastValues);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
index 73621a5208e..62a6a465410 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/wal/recover/file/AbstractTsFileRecoverPerformer.java
@@ -115,7 +115,7 @@ public abstract class AbstractTsFileRecoverPerformer 
implements Closeable {
   protected void reconstructResourceFile() throws IOException {
     try (TsFileSequenceReader reader =
         new 
TsFileSequenceReader(tsFileResource.getTsFile().getAbsolutePath())) {
-      TsFileResourceUtils.updateTsFileResource(reader, tsFileResource);
+      TsFileResourceUtils.updateTsFileResource(reader, tsFileResource, false);
     }
 
     // set progress index for pipe to avoid data loss
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index d6962e2ae35..b483266a544 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -54,9 +54,13 @@ import 
org.apache.iotdb.db.storageengine.rescon.disk.strategy.DirectoryStrategyT
 import org.apache.iotdb.metrics.utils.MetricLevel;
 
 import org.apache.tsfile.common.constant.TsFileConstant;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.metadata.ChunkGroupMetadata;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.utils.TsPrimitiveType;
 import org.apache.tsfile.write.writer.TsFileIOWriter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -68,12 +72,14 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static org.apache.iotdb.db.utils.constant.SqlConstant.ROOT;
@@ -533,12 +539,52 @@ public class LoadTsFileManager {
         TsFileIOWriter writer, TsFileResource tsFileResource, ProgressIndex 
progressIndex)
         throws IOException {
       // Update time index by chunk groups still in memory
+      Map<IDeviceID, Map<String, TimeValuePair>> deviceLastValues = null;
+      if 
(IoTDBDescriptor.getInstance().getConfig().isCacheLastValuesForLoad()) {
+        deviceLastValues = new HashMap<>();
+      }
       for (final ChunkGroupMetadata chunkGroupMetadata : 
writer.getChunkGroupMetadataList()) {
         final IDeviceID device = chunkGroupMetadata.getDevice();
         for (final ChunkMetadata chunkMetadata : 
chunkGroupMetadata.getChunkMetadataList()) {
           tsFileResource.updateStartTime(device, chunkMetadata.getStartTime());
           tsFileResource.updateEndTime(device, chunkMetadata.getEndTime());
+          if (deviceLastValues != null) {
+            deviceLastValues
+                .computeIfAbsent(device, d -> new HashMap<>())
+                .compute(
+                    chunkMetadata.getMeasurementUid(),
+                    (m, oldPair) -> {
+                      if (oldPair != null && oldPair.getTimestamp() > 
chunkMetadata.getEndTime()) {
+                        return oldPair;
+                      }
+                      TsPrimitiveType lastValue =
+                          chunkMetadata.getStatistics() != null
+                                  && chunkMetadata.getDataType() != 
TSDataType.BLOB
+                              ? TsPrimitiveType.getByType(
+                                  chunkMetadata.getDataType() == 
TSDataType.VECTOR
+                                      ? TSDataType.INT64
+                                      : chunkMetadata.getDataType(),
+                                  chunkMetadata.getStatistics().getLastValue())
+                              : null;
+                      return new TimeValuePair(chunkMetadata.getEndTime(), 
lastValue);
+                    });
+          }
+        }
+      }
+      if (deviceLastValues != null) {
+        Map<IDeviceID, List<Pair<String, TimeValuePair>>> 
finalDeviceLastValues;
+        finalDeviceLastValues = new HashMap<>(deviceLastValues.size());
+        for (final Map.Entry<IDeviceID, Map<String, TimeValuePair>> entry :
+            deviceLastValues.entrySet()) {
+          final IDeviceID device = entry.getKey();
+          Map<String, TimeValuePair> lastValues = entry.getValue();
+          List<Pair<String, TimeValuePair>> pairList =
+              lastValues.entrySet().stream()
+                  .map(e -> new Pair<>(e.getKey(), e.getValue()))
+                  .collect(Collectors.toList());
+          finalDeviceLastValues.put(device, pairList);
         }
+        tsFileResource.setLastValues(finalDeviceLastValues);
       }
       tsFileResource.setStatus(TsFileResourceStatus.NORMAL);
       tsFileResource.setProgressIndex(progressIndex);

Reply via email to