This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d58313225e6 Fix compaction writer size checkpointing (#17941)
d58313225e6 is described below
commit d58313225e68c5423356cdab8fbabb2f697fde64
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jun 16 09:34:26 2026 +0800
Fix compaction writer size checkpointing (#17941)
---
.../utils/writer/AbstractCompactionWriter.java | 165 ++++++++++++++++++--
.../writer/AbstractCrossCompactionWriter.java | 3 +-
.../writer/AbstractInnerCompactionWriter.java | 4 +-
.../utils/writer/FastInnerCompactionWriter.java | 10 +-
.../writer/ReadPointCrossCompactionWriter.java | 5 +-
.../writer/ReadPointInnerCompactionWriter.java | 5 +-
.../writer/RepairUnsortedFileCompactionWriter.java | 4 +-
.../utils/writer/AbstractCompactionWriterTest.java | 167 +++++++++++++++++++++
8 files changed, 343 insertions(+), 20 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
index 0009c1e83e6..56ee94acf0c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java
@@ -24,13 +24,16 @@ import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.i18n.StorageEngineMessages;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.exception.CompactionLastTimeCheckFailedException;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.task.CompactionTaskSummary;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.batch.utils.FollowingBatchCompactionAlignedChunkWriter;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
import
org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter;
import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry;
+import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.exception.write.PageException;
import org.apache.tsfile.file.header.PageHeader;
import org.apache.tsfile.file.metadata.ChunkMetadata;
@@ -66,6 +69,13 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
// The index of the array corresponds to subTaskId.
protected int[] chunkPointNumArray = new int[subTaskNum];
+ // Each sub task has estimated total size of written points in current chunk.
+ // The index of the array corresponds to subTaskId.
+ protected long[] writtenPointTotalSizeArray = new long[subTaskNum];
+
+ // Whether each sub task's current chunk writer contains TEXT, STRING, BLOB
or OBJECT.
+ protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum];
+
// used to control the target chunk size
protected long targetChunkSize =
IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize();
@@ -77,7 +87,12 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
@SuppressWarnings("squid:S1170")
private final long checkPoint = (targetChunkPointNum >= 10 ?
targetChunkPointNum : 10) / 10;
- private long lastCheckIndex = 0;
+ private final long[] lastCheckIndexArray = new long[subTaskNum];
+
+ // When estimated size of written points reaches check point, then check
chunk size.
+ private final long writtenPointTotalSizeCheckPoint =
Math.max(targetChunkSize / 10, 1L);
+
+ private final long[] lastWrittenPointTotalSizeCheckIndexArray = new
long[subTaskNum];
// if unsealed chunk size is lower then this, then deserialize next chunk no
matter it is
// overlapped or not
@@ -122,10 +137,24 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
}
public void startMeasurement(String measurement, IChunkWriter chunkWriter,
int subTaskId) {
- lastCheckIndex = 0;
+ resetChunkWriterStatistics(subTaskId);
lastTimeSet[subTaskId] = false;
chunkWriters[subTaskId] = chunkWriter;
measurementId[subTaskId] = measurement;
+ hasVariableLengthTypeArray[subTaskId] =
containsVariableLengthType(chunkWriter);
+ }
+
+ private boolean containsVariableLengthType(IChunkWriter chunkWriter) {
+ if (chunkWriter instanceof ChunkWriterImpl) {
+ return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary();
+ }
+ AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
+ for (ValueChunkWriter valueChunkWriter :
alignedChunkWriter.getValueChunkWriterList()) {
+ if (valueChunkWriter.getDataType().isBinary()) {
+ return true;
+ }
+ }
+ return false;
}
public abstract void endMeasurement(int subTaskId) throws IOException;
@@ -146,7 +175,9 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
*/
public abstract void checkAndMayFlushChunkMetadata() throws IOException;
- protected void writeDataPoint(long timestamp, TsPrimitiveType value,
IChunkWriter chunkWriter) {
+ protected void writeDataPoint(
+ long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int
subTaskId) {
+ long writtenPointTotalSize = 0;
if (chunkWriter instanceof ChunkWriterImpl) {
ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter;
switch (chunkWriterImpl.getDataType()) {
@@ -155,6 +186,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
case BLOB:
case OBJECT:
chunkWriterImpl.write(timestamp, value.getBinary());
+ writtenPointTotalSize += value.getBinary().getLength();
break;
case DOUBLE:
chunkWriterImpl.write(timestamp, value.getDouble());
@@ -180,9 +212,88 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
} else {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriter;
alignedChunkWriter.write(timestamp, value.getVector());
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ writtenPointTotalSize = estimateWrittenPointTotalSize(value);
+ }
+ }
+ chunkPointNumArray[subTaskId]++;
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize;
}
}
+ private long estimateWrittenPointTotalSize(TsPrimitiveType value) {
+ long size = Long.BYTES;
+ TsPrimitiveType[] vector = value.getVector();
+ for (TsPrimitiveType tsPrimitiveType : vector) {
+ if (tsPrimitiveType == null) {
+ continue;
+ }
+ TSDataType dataType = tsPrimitiveType.getDataType();
+ switch (dataType) {
+ case TEXT:
+ case STRING:
+ case BLOB:
+ case OBJECT:
+ size += tsPrimitiveType.getBinary().getLength();
+ break;
+ case DOUBLE:
+ case INT64:
+ case TIMESTAMP:
+ size += Long.BYTES;
+ break;
+ case INT32:
+ case DATE:
+ case FLOAT:
+ size += Integer.BYTES;
+ break;
+ case BOOLEAN:
+ size += 1;
+ break;
+ default:
+ break;
+ }
+ }
+ return size;
+ }
+
+ protected long estimateWrittenPointTotalSize(TsBlock tsBlock) {
+ int pointCount = tsBlock.getPositionCount();
+ long size = (long) Long.BYTES * pointCount;
+ Column[] columns = tsBlock.getValueColumns();
+ for (Column column : columns) {
+ TSDataType dataType = column.getDataType();
+ if (dataType.isBinary()) {
+ for (int j = 0; j < pointCount; j++) {
+ if (column.isNull(j)) {
+ continue;
+ }
+ size += column.getBinary(j).getLength();
+ }
+ continue;
+ }
+ // This is only used as a checkpoint estimate, so fixed-width values use
count directly.
+ switch (dataType) {
+ case DOUBLE:
+ case INT64:
+ case TIMESTAMP:
+ size += (long) Long.BYTES * pointCount;
+ break;
+ case INT32:
+ case DATE:
+ case FLOAT:
+ size += (long) Integer.BYTES * pointCount;
+ break;
+ case BOOLEAN:
+ size += pointCount;
+ break;
+ default:
+ break;
+ }
+ }
+ return size;
+ }
+
@SuppressWarnings("squid:S2445")
protected void sealChunk(
CompactionTsFileWriter targetWriter, IChunkWriter chunkWriter, int
subTaskId)
@@ -190,7 +301,14 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
synchronized (targetWriter) {
targetWriter.writeChunk(chunkWriter);
}
+ resetChunkWriterStatistics(subTaskId);
+ }
+
+ private void resetChunkWriterStatistics(int subTaskId) {
chunkPointNumArray[subTaskId] = 0;
+ writtenPointTotalSizeArray[subTaskId] = 0;
+ lastCheckIndexArray[subTaskId] = 0;
+ lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0;
}
public abstract EncryptParameter getEncryptParameter();
@@ -214,7 +332,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
synchronized (targetWriter) {
// seal last chunk to file writer
targetWriter.writeChunk(chunkWriters[subTaskId]);
- chunkPointNumArray[subTaskId] = 0;
+ resetChunkWriterStatistics(subTaskId);
targetWriter.writeChunk(chunk, chunkMetadata);
}
}
@@ -232,7 +350,7 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl)
chunkWriters[subTaskId];
// seal last chunk to file writer
targetWriter.writeChunk(alignedChunkWriter);
- chunkPointNumArray[subTaskId] = 0;
+ resetChunkWriterStatistics(subTaskId);
targetWriter.markStartingWritingAligned();
@@ -279,6 +397,9 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader);
chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount();
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ writtenPointTotalSizeArray[subTaskId] +=
pageHeader.getSerializedPageSize();
+ }
}
public abstract boolean flushAlignedPage(AlignedPageElement
alignedPageElement, int subTaskId)
@@ -303,29 +424,51 @@ public abstract class AbstractCompactionWriter implements
AutoCloseable {
// flush new time page to chunk writer directly
alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData,
timePageHeader);
+ long writtenValuePageSize = 0;
// flush new value pages to chunk writer directly
for (int i = 0; i < valuePageHeaders.size(); i++) {
- if (valuePageHeaders.get(i) == null) {
+ PageHeader valuePageHeader = valuePageHeaders.get(i);
+ if (valuePageHeader == null) {
// sub sensor does not exist in current file or value page has been
deleted completely
alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer();
continue;
}
alignedChunkWriter.writePageHeaderAndDataIntoValueBuff(
- compressedValuePageDatas.get(i), valuePageHeaders.get(i), i);
+ compressedValuePageDatas.get(i), valuePageHeader, i);
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ writtenValuePageSize += valuePageHeader.getSerializedPageSize();
+ }
}
chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount();
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ // Direct-flushed pages are already serialized, so use page size as
checkpoint estimate.
+ writtenPointTotalSizeArray[subTaskId] +=
+ timePageHeader.getSerializedPageSize() + writtenValuePageSize;
+ }
}
protected void checkChunkSizeAndMayOpenANewChunk(
CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int
subTaskId)
throws IOException {
- if (chunkPointNumArray[subTaskId] >= (lastCheckIndex + 1) * checkPoint) {
- // if chunk point num reaches the check point, then check if the chunk
size over threshold
- lastCheckIndex = chunkPointNumArray[subTaskId] / checkPoint;
+ if (chunkWriter instanceof FollowingBatchCompactionAlignedChunkWriter
+ && chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize,
targetChunkPointNum, false)) {
+ sealChunk(fileWriter, chunkWriter, subTaskId);
+ return;
+ }
+ boolean reachesPointCheckPoint =
+ chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1)
* checkPoint;
+ boolean reachesSizeCheckPoint =
+ hasVariableLengthTypeArray[subTaskId]
+ && writtenPointTotalSizeArray[subTaskId]
+ >= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1)
+ * writtenPointTotalSizeCheckPoint;
+ if (reachesPointCheckPoint || reachesSizeCheckPoint) {
+ lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] /
checkPoint;
+ lastWrittenPointTotalSizeCheckIndexArray[subTaskId] =
+ writtenPointTotalSizeArray[subTaskId] /
writtenPointTotalSizeCheckPoint;
if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize,
targetChunkPointNum, false)) {
sealChunk(fileWriter, chunkWriter, subTaskId);
- lastCheckIndex = 0;
}
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
index 61cec47d39a..ac4120614de 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java
@@ -150,8 +150,7 @@ public abstract class AbstractCrossCompactionWriter extends
AbstractCompactionWr
checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId);
int fileIndex = seqFileIndexArray[subTaskId];
- writeDataPoint(timestamp, value, chunkWriters[subTaskId]);
- chunkPointNumArray[subTaskId]++;
+ writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId);
checkChunkSizeAndMayOpenANewChunk(
targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId);
isDeviceExistedInTargetFiles[fileIndex] = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
index 0ace4d5cb97..c59c658708c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java
@@ -142,8 +142,8 @@ public abstract class AbstractInnerCompactionWriter extends
AbstractCompactionWr
@Override
public void write(TimeValuePair timeValuePair, int subTaskId) throws
IOException {
checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId);
- writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(),
chunkWriters[subTaskId]);
- chunkPointNumArray[subTaskId]++;
+ writeDataPoint(
+ timeValuePair.getTimestamp(), timeValuePair.getValue(),
chunkWriters[subTaskId], subTaskId);
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
lastTime[subTaskId] = timeValuePair.getTimestamp();
lastTimeSet[subTaskId] = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
index cdf93b45728..ccac7b4c635 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/FastInnerCompactionWriter.java
@@ -199,6 +199,8 @@ public class FastInnerCompactionWriter extends
AbstractInnerCompactionWriter {
valuePageHeaders,
subTaskId);
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
+
lastTime[subTaskId] = timePageHeader.getEndTime();
lastTimeSet[subTaskId] = true;
return true;
@@ -235,6 +237,8 @@ public class FastInnerCompactionWriter extends
AbstractInnerCompactionWriter {
valuePageHeaders,
subTaskId);
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
+
lastTime[subTaskId] = timePageHeader.getEndTime();
lastTimeSet[subTaskId] = true;
return true;
@@ -245,10 +249,12 @@ public class FastInnerCompactionWriter extends
AbstractInnerCompactionWriter {
* successfully or not. Return false if the unsealed page is too small or
the end time of page
* exceeds the end time of file, else return true.
*
+ * @throws IOException if io errors occurred
* @throws PageException if errors occurred when write data page header
*/
public boolean flushNonAlignedPage(
- ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId)
throws PageException {
+ ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId)
+ throws IOException, PageException {
checkPreviousTimestamp(pageHeader.getStartTime(), subTaskId);
boolean isUnsealedPageOverThreshold =
chunkWriters[subTaskId].checkIsUnsealedPageOverThreshold(
@@ -261,6 +267,8 @@ public class FastInnerCompactionWriter extends
AbstractInnerCompactionWriter {
flushNonAlignedPageToChunkWriter(
(ChunkWriterImpl) chunkWriters[subTaskId], compressedPageData,
pageHeader, subTaskId);
+ checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
+
lastTime[subTaskId] = pageHeader.getEndTime();
lastTimeSet[subTaskId] = true;
return true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
index 6810df4d1a3..65b955aa757 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java
@@ -68,13 +68,16 @@ public class ReadPointCrossCompactionWriter extends
AbstractCrossCompactionWrite
checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(),
subTaskId);
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl)
this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
+ chunkPointNumArray[subTaskId] += batchSize;
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ writtenPointTotalSizeArray[subTaskId] +=
estimateWrittenPointTotalSize(tsBlock);
+ }
synchronized (this) {
// we need to synchronized here to avoid multi-thread competition in
sub-task
TsFileResource resource =
targetResources.get(seqFileIndexArray[subTaskId]);
resource.updateStartTime(deviceId, timestamps.getStartTime());
resource.updateEndTime(deviceId, timestamps.getEndTime());
}
- chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
checkChunkSizeAndMayOpenANewChunk(
targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter,
subTaskId);
isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
index 5530a1995ce..ec266f9cde8 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java
@@ -71,7 +71,10 @@ public class ReadPointInnerCompactionWriter extends
AbstractInnerCompactionWrite
int batchSize = tsBlock.getPositionCount();
AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl)
this.chunkWriters[subTaskId];
chunkWriter.write(timestamps, columns, batchSize);
- chunkPointNumArray[subTaskId] += timestamps.getTimes().length;
+ chunkPointNumArray[subTaskId] += batchSize;
+ if (hasVariableLengthTypeArray[subTaskId]) {
+ writtenPointTotalSizeArray[subTaskId] +=
estimateWrittenPointTotalSize(tsBlock);
+ }
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
index 15d59be8d68..63e8a58a440 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java
@@ -84,8 +84,8 @@ public class RepairUnsortedFileCompactionWriter extends
ReadPointInnerCompaction
}
private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId)
throws IOException {
- writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(),
chunkWriters[subTaskId]);
- chunkPointNumArray[subTaskId]++;
+ writeDataPoint(
+ timeValuePair.getTimestamp(), timeValuePair.getValue(),
chunkWriters[subTaskId], subTaskId);
checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId],
subTaskId);
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
new file mode 100644
index 00000000000..de28c629215
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer;
+
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement;
+import
org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController;
+
+import org.apache.tsfile.encrypt.EncryptParameter;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.PageException;
+import org.apache.tsfile.file.header.PageHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.statistics.Statistics;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.Schema;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public class AbstractCompactionWriterTest {
+
+ private static final int SUB_TASK_ID = 0;
+
+ @Test
+ public void
testBinarySizeCheckpointTriggersChunkSizeCheckBeforePointCheckpoint()
+ throws IOException, PageException {
+ TestCompactionWriter compactionWriter = new TestCompactionWriter();
+ CountingChunkWriter chunkWriter = new CountingChunkWriter();
+ PageHeader pageHeader =
+
createPageHeader(compactionWriter.getCompressedSizeToReachSizeCheckpoint());
+
+ compactionWriter.startMeasurement("s1", chunkWriter, SUB_TASK_ID);
+ compactionWriter.flushNonAlignedPageToChunkWriter(
+ chunkWriter, ByteBuffer.allocate(0), pageHeader, SUB_TASK_ID);
+ compactionWriter.checkChunkSizeAndMayOpenANewChunk(null, chunkWriter,
SUB_TASK_ID);
+
+ Assert.assertEquals(1, chunkWriter.chunkSizeCheckCount);
+ }
+
+ private static PageHeader createPageHeader(int compressedSize) {
+ Statistics<?> statistics = Statistics.getStatsByType(TSDataType.TEXT);
+ return new PageHeader(compressedSize, compressedSize, statistics);
+ }
+
+ private static class CountingChunkWriter extends ChunkWriterImpl {
+
+ private int chunkSizeCheckCount;
+
+ private CountingChunkWriter() {
+ super(new MeasurementSchema("s1", TSDataType.TEXT));
+ }
+
+ @Override
+ public boolean checkIsChunkSizeOverThreshold(
+ long size, long pointNum, boolean returnTrueIfChunkEmpty) {
+ chunkSizeCheckCount++;
+ return false;
+ }
+ }
+
+ private static class TestCompactionWriter extends AbstractCompactionWriter {
+
+ private int getCompressedSizeToReachSizeCheckpoint() {
+ return (int) Math.max(targetChunkSize / 10, 1L);
+ }
+
+ @Override
+ public void startChunkGroup(IDeviceID deviceId, boolean isAlign) {}
+
+ @Override
+ public void endChunkGroup() {}
+
+ @Override
+ public void endMeasurement(int subTaskId) {}
+
+ @Override
+ public void write(TimeValuePair timeValuePair, int subTaskId) {}
+
+ @Override
+ public void write(TsBlock tsBlock, int subTaskId) {}
+
+ @Override
+ public void endFile() {}
+
+ @Override
+ public long getWriterSize() {
+ return 0;
+ }
+
+ @Override
+ public void checkAndMayFlushChunkMetadata() {}
+
+ @Override
+ public EncryptParameter getEncryptParameter() {
+ return null;
+ }
+
+ @Override
+ public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata
chunkMetadata, int subTaskId) {
+ return false;
+ }
+
+ @Override
+ public boolean flushAlignedChunk(ChunkMetadataElement
chunkMetadataElement, int subTaskId) {
+ return false;
+ }
+
+ @Override
+ public boolean flushBatchedValueChunk(
+ ChunkMetadataElement chunkMetadataElement,
+ int subTaskId,
+ AbstractCompactionFlushController flushController) {
+ return false;
+ }
+
+ @Override
+ public boolean flushNonAlignedPage(
+ ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) {
+ return false;
+ }
+
+ @Override
+ public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int
subTaskId) {
+ return false;
+ }
+
+ @Override
+ public boolean flushBatchedValuePage(
+ AlignedPageElement alignedPageElement,
+ int subTaskId,
+ AbstractCompactionFlushController flushController) {
+ return false;
+ }
+
+ @Override
+ public void setSchemaForAllTargetFile(List<Schema> schemas) {}
+
+ @Override
+ public void close() {}
+ }
+}