This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new ad1e7e1bc55 [to rc/1.3.3] Fix compaction cached time chunk flip twice 
(#13904)
ad1e7e1bc55 is described below

commit ad1e7e1bc5516c9b75ccfe75063441d7a3eb6fb1
Author: shuwenwei <[email protected]>
AuthorDate: Fri Oct 25 13:19:13 2024 +0800

    [to rc/1.3.3] Fix compaction cached time chunk flip twice (#13904)
    
    * fix cached time chunk flip twice
    
    * add ut
    
    * rewind
---
 .../executor/batch/utils/BatchCompactionPlan.java  | 17 ++++++++--
 .../estimator/AbstractCompactionEstimator.java     |  2 +-
 ...atchedAlignedSeriesFastInnerCompactionTest.java | 37 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 4 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
index bc3785852de..06473bb5cd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
@@ -19,6 +19,7 @@
 
 package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
 
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
 
 import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -35,7 +36,7 @@ import java.util.List;
 import java.util.Map;
 
 public class BatchCompactionPlan {
-  public static final long MAX_CACHED_TIME_CHUNKS_SIZE = 2 * 1024 * 1024;
+  public static long maxCachedTimeChunksSize = 2 * 1024 * 1024;
   private final List<CompactChunkPlan> compactChunkPlans = new ArrayList<>();
   private final Map<String, Map<TimeRange, ModifiedStatus>> 
alignedPageModifiedStatusCache =
       new HashMap<>();
@@ -51,12 +52,12 @@ public class BatchCompactionPlan {
     if (chunk == null) {
       chunk = reader.readMemChunk(chunkMetadata);
     }
-    chunk.getData().flip();
+    chunk.getData().rewind();
     return chunk;
   }
 
   public void addTimeChunkToCache(String file, long offset, Chunk chunk) {
-    if (cachedTimeChunkSize >= MAX_CACHED_TIME_CHUNKS_SIZE) {
+    if (cachedTimeChunkSize >= maxCachedTimeChunksSize) {
       return;
     }
     cachedTimeChunks.put(
@@ -96,6 +97,16 @@ public class BatchCompactionPlan {
     return compactChunkPlans.isEmpty();
   }
 
+  @TestOnly
+  public static void setMaxCachedTimeChunksSize(long size) {
+    maxCachedTimeChunksSize = size;
+  }
+
+  @TestOnly
+  public static long getMaxCachedTimeChunksSize() {
+    return maxCachedTimeChunksSize;
+  }
+
   @Override
   public String toString() {
     return compactChunkPlans.toString();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
index 4a5554d91da..e07af3ca1f6 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -68,7 +68,7 @@ public abstract class AbstractCompactionEstimator {
               ((double) SystemInfo.getInstance().getMemorySizeForCompaction()
                   / 
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
                   * 
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion())
-          + BatchCompactionPlan.MAX_CACHED_TIME_CHUNKS_SIZE;
+          + BatchCompactionPlan.maxCachedTimeChunksSize;
 
   protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo 
taskInfo);
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
index 7f4b1f83bcf..f07b5de8079 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan;
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
 import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -318,6 +319,42 @@ public class BatchedAlignedSeriesFastInnerCompactionTest 
extends AbstractCompact
     validate(targetResource);
   }
 
+  @Test
+  public void testCompactionByDeserializeWithLargeTimeChunk() throws Exception 
{
+    long defaultMaxCachedTimeChunkSize = 
BatchCompactionPlan.getMaxCachedTimeChunksSize();
+    try {
+      BatchCompactionPlan.setMaxCachedTimeChunksSize(1);
+      TsFileResource unseqResource1 =
+          generateSingleAlignedSeriesFile(
+              "d0",
+              Arrays.asList("s0", "s1", "s2"),
+              new TimeRange[][] {
+                new TimeRange[] {new TimeRange(100, 200), new TimeRange(500, 
600)}
+              },
+              TSEncoding.PLAIN,
+              CompressionType.LZ4,
+              Arrays.asList(false, false, false),
+              false);
+      unseqResources.add(unseqResource1);
+
+      TsFileResource unseqResource2 =
+          generateSingleAlignedSeriesFile(
+              "d0",
+              Arrays.asList("s0", "s1", "s2"),
+              new TimeRange[] {new TimeRange(150, 550)},
+              TSEncoding.PLAIN,
+              CompressionType.LZ4,
+              Arrays.asList(false, false, false),
+              false);
+      unseqResources.add(unseqResource2);
+
+      TsFileResource targetResource = performCompaction();
+      validate(targetResource);
+    } finally {
+      
BatchCompactionPlan.setMaxCachedTimeChunksSize(defaultMaxCachedTimeChunkSize);
+    }
+  }
+
   @Test
   public void testCompactionByDeserializeWithEmptyColumn() throws Exception {
     TsFileResource unseqResource1 =

Reply via email to