This is an automated email from the ASF dual-hosted git repository.

shuwenwei pushed a commit to branch fixBug0612
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 3a89da2b5022496517345cdf68ba8f0d86b2eac4
Author: shuwenwei <[email protected]>
AuthorDate: Mon Jun 15 16:27:29 2026 +0800

    Fix compaction writer size checkpointing
---
 .../utils/writer/AbstractCompactionWriter.java     | 156 +++++++++++++++++--
 .../writer/AbstractCrossCompactionWriter.java      |   3 +-
 .../writer/AbstractInnerCompactionWriter.java      |   4 +-
 .../writer/ReadPointCrossCompactionWriter.java     |   5 +-
 .../writer/ReadPointInnerCompactionWriter.java     |   5 +-
 .../writer/RepairUnsortedFileCompactionWriter.java |   4 +-
 .../utils/writer/AbstractCompactionWriterTest.java | 167 +++++++++++++++++++++
 7 files changed, 326 insertions(+), 18 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 8c0c37094ea..3e0bf0d7b02 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -30,7 +30,9 @@ import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri
 import 
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
 import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
 
+import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.exception.write.PageException;
 import org.apache.tsfile.file.header.PageHeader;
 import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -66,6 +68,13 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   // The index of the array corresponds to subTaskId.
   protected int[] chunkPointNumArray = new int[subTaskNum];
 
+  // Each sub task has estimated total size of written points in current chunk.
+  // The index of the array corresponds to subTaskId.
+  protected long[] writtenPointTotalSizeArray = new long[subTaskNum];
+
+  // Whether each sub task's current chunk writer contains TEXT, STRING, BLOB 
or OBJECT.
+  protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum];
+
   // used to control the target chunk size
   protected long targetChunkSize = 
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
 
@@ -77,7 +86,12 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   @SuppressWarnings("squid:S1170")
   private final long checkPoint = (targetChunkPointNum >= 10 ? 
targetChunkPointNum : 10) / 10;
 
-  private long[] lastCheckIndexArray = new long[subTaskNum];
+  private final long[] lastCheckIndexArray = new long[subTaskNum];
+
+  // When estimated size of written points reaches check point, then check 
chunk size.
+  private final long writtenPointTotalSizeCheckPoint = 
Math.max(targetChunkSize / 10, 1L);
+
+  private final long[] lastWrittenPointTotalSizeCheckIndexArray = new 
long[subTaskNum];
 
   // if unsealed chunk size is lower then this, then deserialize next chunk no 
matter it is
   // overlapped or not
@@ -122,10 +136,24 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
   }
 
   public void startMeasurement(String measurement, IChunkWriter chunkWriter, 
int subTaskId) {
-    lastCheckIndexArray[subTaskId] = 0;
+    resetChunkWriterStatistics(subTaskId);
     lastTimeSet[subTaskId] = false;
     chunkWriters[subTaskId] = chunkWriter;
     measurementId[subTaskId] = measurement;
+    hasVariableLengthTypeArray[subTaskId] = 
containsVariableLengthType(chunkWriter);
+  }
+
+  private boolean containsVariableLengthType(IChunkWriter chunkWriter) {
+    if (chunkWriter instanceof ChunkWriterImpl) {
+      return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary();
+    }
+    AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
+    for (ValueChunkWriter valueChunkWriter : 
alignedChunkWriter.getValueChunkWriterList()) {
+      if (valueChunkWriter.getDataType().isBinary()) {
+        return true;
+      }
+    }
+    return false;
   }
 
   public abstract void endMeasurement(int subTaskId) throws IOException;
@@ -146,7 +174,9 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
    */
   public abstract void checkAndMayFlushChunkMetadata() throws IOException;
 
-  protected void writeDataPoint(long timestamp, TsPrimitiveType value, 
IChunkWriter chunkWriter) {
+  protected void writeDataPoint(
+      long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int 
subTaskId) {
+    long writtenPointTotalSize = 0;
     if (chunkWriter instanceof ChunkWriterImpl) {
       ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
       switch (chunkWriterImpl.getDataType()) {
@@ -155,6 +185,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
         case BLOB:
         case OBJECT:
           chunkWriterImpl.write(timestamp, value.getBinary());
+          writtenPointTotalSize += value.getBinary().getLength();
           break;
         case DOUBLE:
           chunkWriterImpl.write(timestamp, value.getDouble());
@@ -180,7 +211,86 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     } else {
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriter;
       alignedChunkWriter.write(timestamp, value.getVector());
+      if (hasVariableLengthTypeArray[subTaskId]) {
+        writtenPointTotalSize = estimateWrittenPointTotalSize(value);
+      }
+    }
+    chunkPointNumArray[subTaskId]++;
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize;
+    }
+  }
+
+  private long estimateWrittenPointTotalSize(TsPrimitiveType value) {
+    long size = Long.BYTES;
+    TsPrimitiveType[] vector = value.getVector();
+    for (TsPrimitiveType tsPrimitiveType : vector) {
+      if (tsPrimitiveType == null) {
+        continue;
+      }
+      TSDataType dataType = tsPrimitiveType.getDataType();
+      switch (dataType) {
+        case TEXT:
+        case STRING:
+        case BLOB:
+        case OBJECT:
+          size += tsPrimitiveType.getBinary().getLength();
+          break;
+        case DOUBLE:
+        case INT64:
+        case TIMESTAMP:
+          size += Long.BYTES;
+          break;
+        case INT32:
+        case DATE:
+        case FLOAT:
+          size += Integer.BYTES;
+          break;
+        case BOOLEAN:
+          size += 1;
+          break;
+        default:
+          break;
+      }
     }
+    return size;
+  }
+
+  protected long estimateWrittenPointTotalSize(TsBlock tsBlock) {
+    int pointCount = tsBlock.getPositionCount();
+    long size = 0;
+    Column[] columns = tsBlock.getValueColumns();
+    for (Column column : columns) {
+      TSDataType dataType = column.getDataType();
+      if (dataType.isBinary()) {
+        for (int j = 0; j < pointCount; j++) {
+          if (column.isNull(j)) {
+            continue;
+          }
+          size += column.getBinary(j).getLength();
+        }
+        continue;
+      }
+      // This is only used as a checkpoint estimate, so fixed-width values use 
count directly.
+      switch (dataType) {
+        case DOUBLE:
+        case INT64:
+        case TIMESTAMP:
+          size += (long) Long.BYTES * pointCount;
+          break;
+        case INT32:
+        case DATE:
+        case FLOAT:
+          size += (long) Integer.BYTES * pointCount;
+          break;
+        case BOOLEAN:
+          size += pointCount;
+          break;
+        default:
+          break;
+      }
+    }
+    return size;
   }
 
   @SuppressWarnings("squid:S2445")
@@ -190,7 +300,14 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     synchronized (targetWriter) {
       targetWriter.writeChunk(chunkWriter);
     }
+    resetChunkWriterStatistics(subTaskId);
+  }
+
+  private void resetChunkWriterStatistics(int subTaskId) {
     chunkPointNumArray[subTaskId] = 0;
+    writtenPointTotalSizeArray[subTaskId] = 0;
+    lastCheckIndexArray[subTaskId] = 0;
+    lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0;
   }
 
   public abstract EncryptParameter getEncryptParameter();
@@ -214,7 +331,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     synchronized (targetWriter) {
       // seal last chunk to file writer
       targetWriter.writeChunk(chunkWriters[subTaskId]);
-      chunkPointNumArray[subTaskId] = 0;
+      resetChunkWriterStatistics(subTaskId);
       targetWriter.writeChunk(chunk, chunkMetadata);
     }
   }
@@ -232,7 +349,7 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
       AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) 
chunkWriters[subTaskId];
       // seal last chunk to file writer
       targetWriter.writeChunk(alignedChunkWriter);
-      chunkPointNumArray[subTaskId] = 0;
+      resetChunkWriterStatistics(subTaskId);
 
       targetWriter.markStartingWritingAligned();
 
@@ -279,6 +396,9 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader);
 
     chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount();
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += 
pageHeader.getSerializedPageSize();
+    }
   }
 
   public abstract boolean flushAlignedPage(AlignedPageElement 
alignedPageElement, int subTaskId)
@@ -303,29 +423,45 @@ public abstract class AbstractCompactionWriter implements 
AutoCloseable {
     // flush new time page to chunk writer directly
     
alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, 
timePageHeader);
 
+    long writtenValuePageSize = 0;
     // flush new value pages to chunk writer directly
     for (int i = 0; i < valuePageHeaders.size(); i++) {
-      if (valuePageHeaders.get(i) == null) {
+      PageHeader valuePageHeader = valuePageHeaders.get(i);
+      if (valuePageHeader == null) {
         // sub sensor does not exist in current file or value page has been 
deleted completely
         
alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
         continue;
       }
       alignedChunkWriter.writePageHeaderAndDataIntoValueBuff(
-          compressedValuePageDatas.get(i), valuePageHeaders.get(i), i);
+          compressedValuePageDatas.get(i), valuePageHeader, i);
+      if (hasVariableLengthTypeArray[subTaskId]) {
+        writtenValuePageSize += valuePageHeader.getSerializedPageSize();
+      }
     }
 
     chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount();
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      // Direct-flushed pages are already serialized, so use value page size 
as checkpoint estimate.
+      writtenPointTotalSizeArray[subTaskId] += writtenValuePageSize;
+    }
   }
 
   protected void checkChunkSizeAndMayOpenANewChunk(
       CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int 
subTaskId)
       throws IOException {
-    if (chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) 
* checkPoint) {
-      // if chunk point num reaches the check point, then check if the chunk 
size over threshold
+    boolean reachesPointCheckPoint =
+        chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) 
* checkPoint;
+    boolean reachesSizeCheckPoint =
+        hasVariableLengthTypeArray[subTaskId]
+            && writtenPointTotalSizeArray[subTaskId]
+                >= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1)
+                    * writtenPointTotalSizeCheckPoint;
+    if (reachesPointCheckPoint || reachesSizeCheckPoint) {
       lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] / 
checkPoint;
+      lastWrittenPointTotalSizeCheckIndexArray[subTaskId] =
+          writtenPointTotalSizeArray[subTaskId] / 
writtenPointTotalSizeCheckPoint;
       if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, 
targetChunkPointNum, false)) {
         sealChunk(fileWriter, chunkWriter, subTaskId);
-        lastCheckIndexArray[subTaskId] = 0;
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 61cec47d39a..ac4120614de 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -150,8 +150,7 @@ public abstract class AbstractCrossCompactionWriter extends 
AbstractCompactionWr
 
     checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
     int fileIndex = seqFileIndexArray[subTaskId];
-    writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
-    chunkPointNumArray[subTaskId]++;
+    writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId);
     checkChunkSizeAndMayOpenANewChunk(
         targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
     isDeviceExistedInTargetFiles[fileIndex] = true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 0ace4d5cb97..c59c658708c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -142,8 +142,8 @@ public abstract class AbstractInnerCompactionWriter extends 
AbstractCompactionWr
   @Override
   public void write(TimeValuePair timeValuePair, int subTaskId) throws 
IOException {
     checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId);
-    writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), 
chunkWriters[subTaskId]);
-    chunkPointNumArray[subTaskId]++;
+    writeDataPoint(
+        timeValuePair.getTimestamp(), timeValuePair.getValue(), 
chunkWriters[subTaskId], subTaskId);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], 
subTaskId);
     lastTime[subTaskId] = timeValuePair.getTimestamp();
     lastTimeSet[subTaskId] = true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
index 6810df4d1a3..65b955aa757 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
@@ -68,13 +68,16 @@ public class ReadPointCrossCompactionWriter extends 
AbstractCrossCompactionWrite
     checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), 
subTaskId);
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) 
this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
+    chunkPointNumArray[subTaskId] += batchSize;
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += 
estimateWrittenPointTotalSize(tsBlock);
+    }
     synchronized (this) {
       // we need to synchronized here to avoid multi-thread competition in 
sub-task
       TsFileResource resource = 
targetResources.get(seqFileIndexArray[subTaskId]);
       resource.updateStartTime(deviceId, timestamps.getStartTime());
       resource.updateEndTime(deviceId, timestamps.getEndTime());
     }
-    chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
     checkChunkSizeAndMayOpenANewChunk(
         targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, 
subTaskId);
     isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
index 5530a1995ce..ec266f9cde8 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
@@ -71,7 +71,10 @@ public class ReadPointInnerCompactionWriter extends 
AbstractInnerCompactionWrite
     int batchSize = tsBlock.getPositionCount();
     AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) 
this.chunkWriters[subTaskId];
     chunkWriter.write(timestamps, columns, batchSize);
-    chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
+    chunkPointNumArray[subTaskId] += batchSize;
+    if (hasVariableLengthTypeArray[subTaskId]) {
+      writtenPointTotalSizeArray[subTaskId] += 
estimateWrittenPointTotalSize(tsBlock);
+    }
     checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
index 15d59be8d68..63e8a58a440 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
@@ -84,8 +84,8 @@ public class RepairUnsortedFileCompactionWriter extends 
ReadPointInnerCompaction
   }
 
   private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId) 
throws IOException {
-    writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), 
chunkWriters[subTaskId]);
-    chunkPointNumArray[subTaskId]++;
+    writeDataPoint(
+        timeValuePair.getTimestamp(), timeValuePair.getValue(), 
chunkWriters[subTaskId], subTaskId);
     checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], 
subTaskId);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
new file mode 100644
index 00000000000..de28c629215
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer;
+
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
+import 
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
+
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class AbstractCompactionWriterTest {
+
+  private static final int SUB_TASK_ID = 0;
+
+  @Test
+  public void 
testBinarySizeCheckpointTriggersChunkSizeCheckBeforePointCheckpoint()
+      throws IOException, PageException {
+    TestCompactionWriter compactionWriter = new TestCompactionWriter();
+    CountingChunkWriter chunkWriter = new CountingChunkWriter();
+    PageHeader pageHeader =
+        
createPageHeader(compactionWriter.getCompressedSizeToReachSizeCheckpoint());
+
+    compactionWriter.startMeasurement("s1", chunkWriter, SUB_TASK_ID);
+    compactionWriter.flushNonAlignedPageToChunkWriter(
+        chunkWriter, ByteBuffer.allocate(0), pageHeader, SUB_TASK_ID);
+    compactionWriter.checkChunkSizeAndMayOpenANewChunk(null, chunkWriter, 
SUB_TASK_ID);
+
+    Assert.assertEquals(1, chunkWriter.chunkSizeCheckCount);
+  }
+
+  private static PageHeader createPageHeader(int compressedSize) {
+    Statistics<?> statistics = Statistics.getStatsByType(TSDataType.TEXT);
+    return new PageHeader(compressedSize, compressedSize, statistics);
+  }
+
+  private static class CountingChunkWriter extends ChunkWriterImpl {
+
+    private int chunkSizeCheckCount;
+
+    private CountingChunkWriter() {
+      super(new MeasurementSchema("s1", TSDataType.TEXT));
+    }
+
+    @Override
+    public boolean checkIsChunkSizeOverThreshold(
+        long size, long pointNum, boolean returnTrueIfChunkEmpty) {
+      chunkSizeCheckCount++;
+      return false;
+    }
+  }
+
+  private static class TestCompactionWriter extends AbstractCompactionWriter {
+
+    private int getCompressedSizeToReachSizeCheckpoint() {
+      return (int) Math.max(targetChunkSize / 10, 1L);
+    }
+
+    @Override
+    public void startChunkGroup(IDeviceID deviceId, boolean isAlign) {}
+
+    @Override
+    public void endChunkGroup() {}
+
+    @Override
+    public void endMeasurement(int subTaskId) {}
+
+    @Override
+    public void write(TimeValuePair timeValuePair, int subTaskId) {}
+
+    @Override
+    public void write(TsBlock tsBlock, int subTaskId) {}
+
+    @Override
+    public void endFile() {}
+
+    @Override
+    public long getWriterSize() {
+      return 0;
+    }
+
+    @Override
+    public void checkAndMayFlushChunkMetadata() {}
+
+    @Override
+    public EncryptParameter getEncryptParameter() {
+      return null;
+    }
+
+    @Override
+    public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata 
chunkMetadata, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushAlignedChunk(ChunkMetadataElement 
chunkMetadataElement, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushBatchedValueChunk(
+        ChunkMetadataElement chunkMetadataElement,
+        int subTaskId,
+        AbstractCompactionFlushController flushController) {
+      return false;
+    }
+
+    @Override
+    public boolean flushNonAlignedPage(
+        ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int 
subTaskId) {
+      return false;
+    }
+
+    @Override
+    public boolean flushBatchedValuePage(
+        AlignedPageElement alignedPageElement,
+        int subTaskId,
+        AbstractCompactionFlushController flushController) {
+      return false;
+    }
+
+    @Override
+    public void setSchemaForAllTargetFile(List<Schema> schemas) {}
+
+    @Override
+    public void close() {}
+  }
+}

Reply via email to