This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
new ad1e7e1bc55 [to rc/1.3.3] Fix compaction cached time chunk flip twice
(#13904)
ad1e7e1bc55 is described below
commit ad1e7e1bc5516c9b75ccfe75063441d7a3eb6fb1
Author: shuwenwei <[email protected]>
AuthorDate: Fri Oct 25 13:19:13 2024 +0800
[to rc/1.3.3] Fix compaction cached time chunk flip twice (#13904)
* fix cached time chunk flip twice
* add ut
* rewind
---
.../executor/batch/utils/BatchCompactionPlan.java | 17 ++++++++--
.../estimator/AbstractCompactionEstimator.java | 2 +-
...atchedAlignedSeriesFastInnerCompactionTest.java | 37 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 4 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
index bc3785852de..06473bb5cd9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/executor/batch/utils/BatchCompactionPlan.java
@@ -19,6 +19,7 @@
package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils;
+import org.apache.iotdb.commons.utils.TestOnly;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.ModifiedStatus;
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -35,7 +36,7 @@ import java.util.List;
import java.util.Map;
public class BatchCompactionPlan {
- public static final long MAX_CACHED_TIME_CHUNKS_SIZE = 2 * 1024 * 1024;
+ public static long maxCachedTimeChunksSize = 2 * 1024 * 1024;
private final List<CompactChunkPlan> compactChunkPlans = new ArrayList<>();
private final Map<String, Map<TimeRange, ModifiedStatus>>
alignedPageModifiedStatusCache =
new HashMap<>();
@@ -51,12 +52,12 @@ public class BatchCompactionPlan {
if (chunk == null) {
chunk = reader.readMemChunk(chunkMetadata);
}
- chunk.getData().flip();
+ chunk.getData().rewind();
return chunk;
}
public void addTimeChunkToCache(String file, long offset, Chunk chunk) {
- if (cachedTimeChunkSize >= MAX_CACHED_TIME_CHUNKS_SIZE) {
+ if (cachedTimeChunkSize >= maxCachedTimeChunksSize) {
return;
}
cachedTimeChunks.put(
@@ -96,6 +97,16 @@ public class BatchCompactionPlan {
return compactChunkPlans.isEmpty();
}
+ @TestOnly
+ public static void setMaxCachedTimeChunksSize(long size) {
+ maxCachedTimeChunksSize = size;
+ }
+
+ @TestOnly
+ public static long getMaxCachedTimeChunksSize() {
+ return maxCachedTimeChunksSize;
+ }
+
@Override
public String toString() {
return compactChunkPlans.toString();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
index 4a5554d91da..e07af3ca1f6 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/selector/estimator/AbstractCompactionEstimator.java
@@ -68,7 +68,7 @@ public abstract class AbstractCompactionEstimator {
((double) SystemInfo.getInstance().getMemorySizeForCompaction()
/
IoTDBDescriptor.getInstance().getConfig().getCompactionThreadCount()
*
IoTDBDescriptor.getInstance().getConfig().getChunkMetadataSizeProportion())
- + BatchCompactionPlan.MAX_CACHED_TIME_CHUNKS_SIZE;
+ + BatchCompactionPlan.maxCachedTimeChunksSize;
protected abstract long calculatingMetadataMemoryCost(CompactionTaskInfo
taskInfo);
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
index 7f4b1f83bcf..f07b5de8079 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/inner/BatchedAlignedSeriesFastInnerCompactionTest.java
@@ -29,6 +29,7 @@ import
org.apache.iotdb.db.storageengine.dataregion.compaction.constant.Compacti
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.performer.impl.FastCompactionPerformer;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.subtask.FastCompactionTaskSummary;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.CompactionUtils;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.BatchCompactionPlan;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionCheckerUtils;
import org.apache.iotdb.db.storageengine.dataregion.modification.Deletion;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
@@ -318,6 +319,42 @@ public class BatchedAlignedSeriesFastInnerCompactionTest
extends AbstractCompact
validate(targetResource);
}
+ @Test
+ public void testCompactionByDeserializeWithLargeTimeChunk() throws Exception
{
+ long defaultMaxCachedTimeChunkSize =
BatchCompactionPlan.getMaxCachedTimeChunksSize();
+ try {
+ BatchCompactionPlan.setMaxCachedTimeChunksSize(1);
+ TsFileResource unseqResource1 =
+ generateSingleAlignedSeriesFile(
+ "d0",
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[][] {
+ new TimeRange[] {new TimeRange(100, 200), new TimeRange(500,
600)}
+ },
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, false),
+ false);
+ unseqResources.add(unseqResource1);
+
+ TsFileResource unseqResource2 =
+ generateSingleAlignedSeriesFile(
+ "d0",
+ Arrays.asList("s0", "s1", "s2"),
+ new TimeRange[] {new TimeRange(150, 550)},
+ TSEncoding.PLAIN,
+ CompressionType.LZ4,
+ Arrays.asList(false, false, false),
+ false);
+ unseqResources.add(unseqResource2);
+
+ TsFileResource targetResource = performCompaction();
+ validate(targetResource);
+ } finally {
+
BatchCompactionPlan.setMaxCachedTimeChunksSize(defaultMaxCachedTimeChunkSize);
+ }
+ }
+
@Test
public void testCompactionByDeserializeWithEmptyColumn() throws Exception {
TsFileResource unseqResource1 =