This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d4074920d44 Load: modify TsFile version check to enable V3 load to V4 
(#13400)
d4074920d44 is described below

commit d4074920d44ff14a328187a8a4c843465c3a1a54
Author: Zikun Ma <[email protected]>
AuthorDate: Wed Sep 11 12:42:30 2024 +0800

    Load: modify TsFile version check to enable V3 load to V4 (#13400)
---
 .../java/org/apache/iotdb/db/conf/IoTDBConfig.java | 12 ++++++
 .../org/apache/iotdb/db/conf/IoTDBDescriptor.java  |  5 +++
 .../plan/analyze/LoadTsFileAnalyzer.java           | 44 +++++++++++++++++-----
 .../load/memory/LoadTsFileAbstractMemoryBlock.java |  4 ++
 .../memory/LoadTsFileAnalyzeSchemaMemoryBlock.java | 28 +++++++++++---
 .../memory/LoadTsFileDataCacheMemoryBlock.java     |  7 ++++
 .../load/memory/LoadTsFileMemoryManager.java       | 28 +++++++++++++-
 .../load/splitter/TsFileSplitter.java              | 13 ++++++-
 .../apache/iotdb/db/utils/ModificationUtils.java   | 14 ++++---
 9 files changed, 130 insertions(+), 25 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
index 4c4b0e824d0..08245360c4b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java
@@ -1136,6 +1136,8 @@ public class IoTDBConfig {
   private long loadTsFileAnalyzeSchemaMemorySizeInBytes =
       0L; // 0 means that the decision will be adaptive based on the number of 
sequences
 
+  private int loadTsFileMaxDeviceCountToUseDeviceTimeIndex = 10000;
+
   private long loadMemoryAllocateRetryIntervalMs = 1000L;
   private int loadMemoryAllocateMaxRetries = 5;
 
@@ -3958,6 +3960,16 @@ public class IoTDBConfig {
     this.loadTsFileAnalyzeSchemaMemorySizeInBytes = 
loadTsFileAnalyzeSchemaMemorySizeInBytes;
   }
 
+  public int getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex() {
+    return loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
+  }
+
+  public void setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex(
+      int loadTsFileMaxDeviceCountToUseDeviceTimeIndex) {
+    this.loadTsFileMaxDeviceCountToUseDeviceTimeIndex =
+        loadTsFileMaxDeviceCountToUseDeviceTimeIndex;
+  }
+
   public long getLoadMemoryAllocateRetryIntervalMs() {
     return loadMemoryAllocateRetryIntervalMs;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
index 58de8530dc4..27532b56ae4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java
@@ -2248,6 +2248,11 @@ public class IoTDBDescriptor {
             properties.getProperty(
                 "load_tsfile_analyze_schema_memory_size_in_bytes",
                 
String.valueOf(conf.getLoadTsFileAnalyzeSchemaMemorySizeInBytes()))));
+    conf.setLoadTsFileMaxDeviceCountToUseDeviceTimeIndex(
+        Integer.parseInt(
+            properties.getProperty(
+                "load_tsfile_max_device_count_to_use_device_index",
+                
String.valueOf(conf.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex()))));
     conf.setLoadCleanupTaskExecutionDelayTimeSeconds(
         Long.parseLong(
             properties.getProperty(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
index 34f99c64388..05ff28b6a8c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/LoadTsFileAnalyzer.java
@@ -101,6 +101,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
 public class LoadTsFileAnalyzer implements AutoCloseable {
@@ -110,12 +111,15 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
   private static final IClientManager<ConfigRegionId, ConfigNodeClient> 
CONFIG_NODE_CLIENT_MANAGER =
       ConfigNodeClientManager.getInstance();
   private static final int BATCH_FLUSH_TIME_SERIES_NUMBER;
+  private static final int MAX_DEVICE_COUNT_TO_USE_DEVICE_TIME_INDEX;
   private static final long ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES;
   private static final long FLUSH_ALIGNED_CACHE_MEMORY_SIZE_IN_BYTES;
 
   static {
     final IoTDBConfig CONFIG = IoTDBDescriptor.getInstance().getConfig();
     BATCH_FLUSH_TIME_SERIES_NUMBER = 
CONFIG.getLoadTsFileAnalyzeSchemaBatchFlushTimeSeriesNumber();
+    MAX_DEVICE_COUNT_TO_USE_DEVICE_TIME_INDEX =
+        CONFIG.getLoadTsFileMaxDeviceCountToUseDeviceTimeIndex();
     ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES =
         CONFIG.getLoadTsFileAnalyzeSchemaMemorySizeInBytes() <= 0
             ? ((long) BATCH_FLUSH_TIME_SERIES_NUMBER) << 10
@@ -250,7 +254,7 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
         tsFileResource.deserialize();
       }
 
-      
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource);
+      
schemaAutoCreatorAndVerifier.setCurrentModificationsAndTimeIndex(tsFileResource,
 reader);
 
       // check if the tsfile is empty
       if (!timeseriesMetadataIterator.hasNext()) {
@@ -317,8 +321,9 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       this.schemaCache = new LoadTsFileAnalyzeSchemaCache();
     }
 
-    public void setCurrentModificationsAndTimeIndex(TsFileResource resource) 
throws IOException {
-      schemaCache.setCurrentModificationsAndTimeIndex(resource);
+    public void setCurrentModificationsAndTimeIndex(
+        TsFileResource resource, TsFileSequenceReader reader) throws 
IOException {
+      schemaCache.setCurrentModificationsAndTimeIndex(resource, reader);
     }
 
     public void autoCreateAndVerify(
@@ -790,22 +795,41 @@ public class LoadTsFileAnalyzer implements AutoCloseable {
       }
     }
 
-    public void setCurrentModificationsAndTimeIndex(TsFileResource resource) 
throws IOException {
+    public void setCurrentModificationsAndTimeIndex(
+        TsFileResource resource, TsFileSequenceReader reader) throws 
IOException {
       clearModificationsAndTimeIndex();
 
       currentModifications = resource.getModFile().getModifications();
       for (final Modification modification : currentModifications) {
         currentModificationsMemoryUsageSizeInBytes += ((Deletion) 
modification).getSerializedSize();
       }
+
+      // If there are too many modifications, a larger memory block is needed 
to avoid frequent
+      // flush.
+      long newMemorySize =
+          currentModificationsMemoryUsageSizeInBytes > 
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES / 2
+              ? currentModificationsMemoryUsageSizeInBytes + 
ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES
+              : ANALYZE_SCHEMA_MEMORY_SIZE_IN_BYTES;
+      block.forceResize(newMemorySize);
       block.addMemoryUsage(currentModificationsMemoryUsageSizeInBytes);
 
-      if (resource.resourceFileExists()) {
-        currentTimeIndex = resource.getTimeIndex();
-        if (currentTimeIndex instanceof FileTimeIndex) {
-          currentTimeIndex = resource.buildDeviceTimeIndex();
+      // No need to build device time index if there are no modifications
+      if (currentModifications.size() > 0 && resource.resourceFileExists()) {
+        final AtomicInteger deviceCount = new AtomicInteger();
+        reader
+            .getAllDevicesIteratorWithIsAligned()
+            .forEachRemaining(o -> deviceCount.getAndIncrement());
+
+        // Use device time index only if the device count is less than the 
threshold, avoiding too
+        // much memory usage
+        if (deviceCount.get() < MAX_DEVICE_COUNT_TO_USE_DEVICE_TIME_INDEX) {
+          currentTimeIndex = resource.getTimeIndex();
+          if (currentTimeIndex instanceof FileTimeIndex) {
+            currentTimeIndex = resource.buildDeviceTimeIndex();
+          }
+          currentTimeIndexMemoryUsageSizeInBytes = 
currentTimeIndex.calculateRamSize();
+          block.addMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
         }
-        currentTimeIndexMemoryUsageSizeInBytes = 
currentTimeIndex.calculateRamSize();
-        block.addMemoryUsage(currentTimeIndexMemoryUsageSizeInBytes);
       }
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
index f0df55a9f63..bca1591b1ef 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAbstractMemoryBlock.java
@@ -42,6 +42,10 @@ public abstract class LoadTsFileAbstractMemoryBlock 
implements AutoCloseable {
 
   public abstract void reduceMemoryUsage(long memoryInBytes);
 
+  abstract long getMemoryUsageInBytes();
+
+  public abstract void forceResize(long newSizeInBytes);
+
   /**
    * Release all memory of this block.
    *
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
index c7add4b446f..bd7ff8c2df4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileAnalyzeSchemaMemoryBlock.java
@@ -34,7 +34,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   private static final Logger LOGGER =
       LoggerFactory.getLogger(LoadTsFileAnalyzeSchemaMemoryBlock.class);
 
-  private final long totalMemorySizeInBytes;
+  private long totalMemorySizeInBytes;
   private final AtomicLong memoryUsageInBytes;
 
   LoadTsFileAnalyzeSchemaMemoryBlock(long totalMemorySizeInBytes) {
@@ -45,12 +45,12 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   }
 
   @Override
-  public boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
+  public synchronized boolean hasEnoughMemory(long memoryTobeAddedInBytes) {
     return memoryUsageInBytes.get() + memoryTobeAddedInBytes <= 
totalMemorySizeInBytes;
   }
 
   @Override
-  public void addMemoryUsage(long memoryInBytes) {
+  public synchronized void addMemoryUsage(long memoryInBytes) {
     memoryUsageInBytes.addAndGet(memoryInBytes);
 
     MetricService.getInstance()
@@ -63,7 +63,7 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   }
 
   @Override
-  public void reduceMemoryUsage(long memoryInBytes) {
+  public synchronized void reduceMemoryUsage(long memoryInBytes) {
     if (memoryUsageInBytes.addAndGet(-memoryInBytes) < 0) {
       LOGGER.warn("{} has reduce memory usage to negative", this);
     }
@@ -78,7 +78,25 @@ public class LoadTsFileAnalyzeSchemaMemoryBlock extends 
LoadTsFileAbstractMemory
   }
 
   @Override
-  protected void releaseAllMemory() {
+  synchronized long getMemoryUsageInBytes() {
+    return memoryUsageInBytes.get();
+  }
+
+  synchronized long getTotalMemorySizeInBytes() {
+    return totalMemorySizeInBytes;
+  }
+
+  synchronized void setTotalMemorySizeInBytes(long totalMemorySizeInBytes) {
+    this.totalMemorySizeInBytes = totalMemorySizeInBytes;
+  }
+
+  @Override
+  public synchronized void forceResize(long newSizeInBytes) {
+    MEMORY_MANAGER.forceResize(this, newSizeInBytes);
+  }
+
+  @Override
+  protected synchronized void releaseAllMemory() {
     if (memoryUsageInBytes.get() != 0) {
       LOGGER.warn(
           "Try to release memory from a memory block {} which has not released 
all memory", this);
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
index e0709cece9e..aefdf84fc0f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileDataCacheMemoryBlock.java
@@ -89,6 +89,12 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
     memoryUsageInBytes.addAndGet(-memoryInBytes);
   }
 
+  @Override
+  public void forceResize(long newSizeInBytes) {
+    throw new UnsupportedOperationException(
+        "resize is not supported for LoadTsFileDataCacheMemoryBlock");
+  }
+
   @Override
   protected void releaseAllMemory() {
     if (memoryUsageInBytes.get() != 0) {
@@ -125,6 +131,7 @@ public class LoadTsFileDataCacheMemoryBlock extends 
LoadTsFileAbstractMemoryBloc
     return referenceCount.get();
   }
 
+  @Override
   long getMemoryUsageInBytes() {
     return memoryUsageInBytes.get();
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
index be6e8dcef97..ab6ba1e77fe 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/memory/LoadTsFileMemoryManager.java
@@ -44,7 +44,7 @@ public class LoadTsFileMemoryManager {
   private final AtomicLong usedMemorySizeInBytes = new AtomicLong(0);
   private LoadTsFileDataCacheMemoryBlock dataCacheMemoryBlock;
 
-  private synchronized void forceAllocatedFromQuery(long sizeInBytes)
+  private synchronized void forceAllocateFromQuery(long sizeInBytes)
       throws LoadRuntimeOutOfMemoryException {
     for (int i = 0; i < MEMORY_ALLOCATE_MAX_RETRIES; i++) {
       // allocate memory from queryEngine
@@ -90,7 +90,7 @@ public class LoadTsFileMemoryManager {
   public synchronized LoadTsFileAnalyzeSchemaMemoryBlock 
allocateAnalyzeSchemaMemoryBlock(
       long sizeInBytes) throws LoadRuntimeOutOfMemoryException {
     try {
-      forceAllocatedFromQuery(sizeInBytes);
+      forceAllocateFromQuery(sizeInBytes);
     } catch (LoadRuntimeOutOfMemoryException e) {
       if (dataCacheMemoryBlock != null && 
dataCacheMemoryBlock.doShrink(sizeInBytes)) {
         return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
@@ -100,6 +100,30 @@ public class LoadTsFileMemoryManager {
     return new LoadTsFileAnalyzeSchemaMemoryBlock(sizeInBytes);
   }
 
+  /**
+   * Resize the memory block to the new size.
+   *
+   * @throws LoadRuntimeOutOfMemoryException if failed to allocate enough 
memory
+   */
+  synchronized void forceResize(LoadTsFileAnalyzeSchemaMemoryBlock 
memoryBlock, long newSizeInBytes)
+      throws LoadRuntimeOutOfMemoryException {
+    if (memoryBlock.getTotalMemorySizeInBytes() >= newSizeInBytes) {
+      releaseToQuery(memoryBlock.getTotalMemorySizeInBytes() - newSizeInBytes);
+      memoryBlock.setTotalMemorySizeInBytes(newSizeInBytes);
+      return;
+    }
+
+    long bytesNeeded = newSizeInBytes - 
memoryBlock.getTotalMemorySizeInBytes();
+    try {
+      forceAllocateFromQuery(bytesNeeded);
+    } catch (LoadRuntimeOutOfMemoryException e) {
+      if (dataCacheMemoryBlock == null || 
!dataCacheMemoryBlock.doShrink(bytesNeeded)) {
+        throw e;
+      }
+    }
+    memoryBlock.setTotalMemorySizeInBytes(newSizeInBytes);
+  }
+
   public synchronized LoadTsFileDataCacheMemoryBlock 
allocateDataCacheMemoryBlock()
       throws LoadRuntimeOutOfMemoryException {
     if (dataCacheMemoryBlock == null) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
index 8d6919062ca..f2536f94568 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/splitter/TsFileSplitter.java
@@ -389,8 +389,17 @@ public class TsFileSplitter {
     }
 
     byte versionNumber = reader.readVersionNumber();
-    if (versionNumber != TSFileConfig.VERSION_NUMBER) {
-      logger.error("the file's Version Number is incorrect, file path: {}", 
reader.getFileName());
+    if (versionNumber < TSFileConfig.VERSION_NUMBER) {
+      if (versionNumber == TSFileConfig.VERSION_NUMBER_V3 && 
TSFileConfig.VERSION_NUMBER == 4) {
+        logger.info(
+            "try to load TsFile V3 into current version (V4), file path: {}", 
reader.getFileName());
+      } else {
+        logger.error("the file's Version Number is too old, file path: {}", 
reader.getFileName());
+        return false;
+      }
+    } else if (versionNumber > TSFileConfig.VERSION_NUMBER) {
+      logger.error(
+          "the file's Version Number is higher than current, file path: {}", 
reader.getFileName());
       return false;
     }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
index 9e7b6f7ecb8..6d10c731e7e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/ModificationUtils.java
@@ -19,10 +19,8 @@
 
 package org.apache.iotdb.db.utils;
 
-import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.path.MeasurementPath;
-import org.apache.iotdb.commons.path.PartialPath;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.selector.impl.SettleSelectorImpl;
 import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
@@ -38,6 +36,8 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+import static 
org.apache.iotdb.commons.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+
 public class ModificationUtils {
 
   private ModificationUtils() {
@@ -194,9 +194,10 @@ public class ModificationUtils {
   public static boolean isDeviceDeletedByMods(
       Collection<Modification> modifications, IDeviceID device, long 
startTime, long endTime)
       throws IllegalPathException {
+    final MeasurementPath deviceWithWildcard = new MeasurementPath(device, 
ONE_LEVEL_PATH_WILDCARD);
     for (Modification modification : modifications) {
-      PartialPath path = modification.getPath();
-      if (path.include(new MeasurementPath(device, 
IoTDBConstant.ONE_LEVEL_PATH_WILDCARD))
+      MeasurementPath path = modification.getPath();
+      if (path.matchFullPath(deviceWithWildcard)
           && ((Deletion) modification).getTimeRange().contains(startTime, 
endTime)) {
         return true;
       }
@@ -211,9 +212,10 @@ public class ModificationUtils {
       long startTime,
       long endTime)
       throws IllegalPathException {
+    final MeasurementPath measurementPath = new MeasurementPath(device, 
timeseriesId);
     for (Modification modification : modifications) {
-      PartialPath path = modification.getPath();
-      if (path.include(new MeasurementPath(device, timeseriesId))
+      MeasurementPath path = modification.getPath();
+      if (path.matchFullPath(measurementPath)
           && ((Deletion) modification).getTimeRange().contains(startTime, 
endTime)) {
         return true;
       }

Reply via email to