This is an automated email from the ASF dual-hosted git repository. shuwenwei pushed a commit to branch fixBug0612 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 3a89da2b5022496517345cdf68ba8f0d86b2eac4 Author: shuwenwei <[email protected]> AuthorDate: Mon Jun 15 16:27:29 2026 +0800 Fix compaction writer size checkpointing --- .../utils/writer/AbstractCompactionWriter.java | 156 +++++++++++++++++-- .../writer/AbstractCrossCompactionWriter.java | 3 +- .../writer/AbstractInnerCompactionWriter.java | 4 +- .../writer/ReadPointCrossCompactionWriter.java | 5 +- .../writer/ReadPointInnerCompactionWriter.java | 5 +- .../writer/RepairUnsortedFileCompactionWriter.java | 4 +- .../utils/writer/AbstractCompactionWriterTest.java | 167 +++++++++++++++++++++ 7 files changed, 326 insertions(+), 18 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java index 8c0c37094ea..3e0bf0d7b02 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriter.java @@ -30,7 +30,9 @@ import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.wri import org.apache.iotdb.db.storageengine.dataregion.compaction.io.CompactionTsFileWriter; import org.apache.iotdb.db.storageengine.dataregion.modification.ModEntry; +import org.apache.tsfile.block.column.Column; import org.apache.tsfile.encrypt.EncryptParameter; +import org.apache.tsfile.enums.TSDataType; import org.apache.tsfile.exception.write.PageException; import org.apache.tsfile.file.header.PageHeader; import org.apache.tsfile.file.metadata.ChunkMetadata; @@ -66,6 +68,13 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { // The index of the array corresponds to subTaskId. protected int[] chunkPointNumArray = new int[subTaskNum]; + // Each sub task has estimated total size of written points in current chunk. + // The index of the array corresponds to subTaskId. + protected long[] writtenPointTotalSizeArray = new long[subTaskNum]; + + // Whether each sub task's current chunk writer contains TEXT, STRING, BLOB or OBJECT. + protected boolean[] hasVariableLengthTypeArray = new boolean[subTaskNum]; + // used to control the target chunk size protected long targetChunkSize = IoTDBDescriptor.getInstance().getConfig().getTargetChunkSize(); @@ -77,7 +86,12 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { @SuppressWarnings("squid:S1170") private final long checkPoint = (targetChunkPointNum >= 10 ? targetChunkPointNum : 10) / 10; - private long[] lastCheckIndexArray = new long[subTaskNum]; + private final long[] lastCheckIndexArray = new long[subTaskNum]; + + // When estimated size of written points reaches check point, then check chunk size. + private final long writtenPointTotalSizeCheckPoint = Math.max(targetChunkSize / 10, 1L); + + private final long[] lastWrittenPointTotalSizeCheckIndexArray = new long[subTaskNum]; // if unsealed chunk size is lower then this, then deserialize next chunk no matter it is // overlapped or not @@ -122,10 +136,24 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } public void startMeasurement(String measurement, IChunkWriter chunkWriter, int subTaskId) { - lastCheckIndexArray[subTaskId] = 0; + resetChunkWriterStatistics(subTaskId); lastTimeSet[subTaskId] = false; chunkWriters[subTaskId] = chunkWriter; measurementId[subTaskId] = measurement; + hasVariableLengthTypeArray[subTaskId] = containsVariableLengthType(chunkWriter); + } + + private boolean containsVariableLengthType(IChunkWriter chunkWriter) { + if (chunkWriter instanceof ChunkWriterImpl) { + return ((ChunkWriterImpl) chunkWriter).getDataType().isBinary(); + } + AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; + for (ValueChunkWriter valueChunkWriter : alignedChunkWriter.getValueChunkWriterList()) { + if (valueChunkWriter.getDataType().isBinary()) { + return true; + } + } + return false; } public abstract void endMeasurement(int subTaskId) throws IOException; @@ -146,7 +174,9 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { */ public abstract void checkAndMayFlushChunkMetadata() throws IOException; - protected void writeDataPoint(long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter) { + protected void writeDataPoint( + long timestamp, TsPrimitiveType value, IChunkWriter chunkWriter, int subTaskId) { + long writtenPointTotalSize = 0; if (chunkWriter instanceof ChunkWriterImpl) { ChunkWriterImpl chunkWriterImpl = (ChunkWriterImpl) chunkWriter; switch (chunkWriterImpl.getDataType()) { @@ -155,6 +185,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { case BLOB: case OBJECT: chunkWriterImpl.write(timestamp, value.getBinary()); + writtenPointTotalSize += value.getBinary().getLength(); break; case DOUBLE: chunkWriterImpl.write(timestamp, value.getDouble()); @@ -180,7 +211,86 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { } else { AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriter; alignedChunkWriter.write(timestamp, value.getVector()); + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSize = estimateWrittenPointTotalSize(value); + } + } + chunkPointNumArray[subTaskId]++; + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += writtenPointTotalSize; + } + } + + private long estimateWrittenPointTotalSize(TsPrimitiveType value) { + long size = Long.BYTES; + TsPrimitiveType[] vector = value.getVector(); + for (TsPrimitiveType tsPrimitiveType : vector) { + if (tsPrimitiveType == null) { + continue; + } + TSDataType dataType = tsPrimitiveType.getDataType(); + switch (dataType) { + case TEXT: + case STRING: + case BLOB: + case OBJECT: + size += tsPrimitiveType.getBinary().getLength(); + break; + case DOUBLE: + case INT64: + case TIMESTAMP: + size += Long.BYTES; + break; + case INT32: + case DATE: + case FLOAT: + size += Integer.BYTES; + break; + case BOOLEAN: + size += 1; + break; + default: + break; + } } + return size; + } + + protected long estimateWrittenPointTotalSize(TsBlock tsBlock) { + int pointCount = tsBlock.getPositionCount(); + long size = 0; + Column[] columns = tsBlock.getValueColumns(); + for (Column column : columns) { + TSDataType dataType = column.getDataType(); + if (dataType.isBinary()) { + for (int j = 0; j < pointCount; j++) { + if (column.isNull(j)) { + continue; + } + size += column.getBinary(j).getLength(); + } + continue; + } + // This is only used as a checkpoint estimate, so fixed-width values use count directly. + switch (dataType) { + case DOUBLE: + case INT64: + case TIMESTAMP: + size += (long) Long.BYTES * pointCount; + break; + case INT32: + case DATE: + case FLOAT: + size += (long) Integer.BYTES * pointCount; + break; + case BOOLEAN: + size += pointCount; + break; + default: + break; + } + } + return size; } @SuppressWarnings("squid:S2445") @@ -190,7 +300,14 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { synchronized (targetWriter) { targetWriter.writeChunk(chunkWriter); } + resetChunkWriterStatistics(subTaskId); + } + + private void resetChunkWriterStatistics(int subTaskId) { chunkPointNumArray[subTaskId] = 0; + writtenPointTotalSizeArray[subTaskId] = 0; + lastCheckIndexArray[subTaskId] = 0; + lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = 0; } public abstract EncryptParameter getEncryptParameter(); @@ -214,7 +331,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { synchronized (targetWriter) { // seal last chunk to file writer targetWriter.writeChunk(chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId] = 0; + resetChunkWriterStatistics(subTaskId); targetWriter.writeChunk(chunk, chunkMetadata); } } @@ -232,7 +349,7 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { AlignedChunkWriterImpl alignedChunkWriter = (AlignedChunkWriterImpl) chunkWriters[subTaskId]; // seal last chunk to file writer targetWriter.writeChunk(alignedChunkWriter); - chunkPointNumArray[subTaskId] = 0; + resetChunkWriterStatistics(subTaskId); targetWriter.markStartingWritingAligned(); @@ -279,6 +396,9 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { chunkWriter.writePageHeaderAndDataIntoBuff(compressedPageData, pageHeader); chunkPointNumArray[subTaskId] += pageHeader.getStatistics().getCount(); + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += pageHeader.getSerializedPageSize(); + } } public abstract boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId) @@ -303,29 +423,45 @@ public abstract class AbstractCompactionWriter implements AutoCloseable { // flush new time page to chunk writer directly alignedChunkWriter.writePageHeaderAndDataIntoTimeBuff(compressedTimePageData, timePageHeader); + long writtenValuePageSize = 0; // flush new value pages to chunk writer directly for (int i = 0; i < valuePageHeaders.size(); i++) { - if (valuePageHeaders.get(i) == null) { + PageHeader valuePageHeader = valuePageHeaders.get(i); + if (valuePageHeader == null) { // sub sensor does not exist in current file or value page has been deleted completely alignedChunkWriter.getValueChunkWriterByIndex(i).writeEmptyPageToPageBuffer(); continue; } alignedChunkWriter.writePageHeaderAndDataIntoValueBuff( - compressedValuePageDatas.get(i), valuePageHeaders.get(i), i); + compressedValuePageDatas.get(i), valuePageHeader, i); + if (hasVariableLengthTypeArray[subTaskId]) { + writtenValuePageSize += valuePageHeader.getSerializedPageSize(); + } } chunkPointNumArray[subTaskId] += timePageHeader.getStatistics().getCount(); + if (hasVariableLengthTypeArray[subTaskId]) { + // Direct-flushed pages are already serialized, so use value page size as checkpoint estimate. + writtenPointTotalSizeArray[subTaskId] += writtenValuePageSize; + } } protected void checkChunkSizeAndMayOpenANewChunk( CompactionTsFileWriter fileWriter, IChunkWriter chunkWriter, int subTaskId) throws IOException { - if (chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) * checkPoint) { - // if chunk point num reaches the check point, then check if the chunk size over threshold + boolean reachesPointCheckPoint = + chunkPointNumArray[subTaskId] >= (lastCheckIndexArray[subTaskId] + 1) * checkPoint; + boolean reachesSizeCheckPoint = + hasVariableLengthTypeArray[subTaskId] + && writtenPointTotalSizeArray[subTaskId] + >= (lastWrittenPointTotalSizeCheckIndexArray[subTaskId] + 1) + * writtenPointTotalSizeCheckPoint; + if (reachesPointCheckPoint || reachesSizeCheckPoint) { lastCheckIndexArray[subTaskId] = chunkPointNumArray[subTaskId] / checkPoint; + lastWrittenPointTotalSizeCheckIndexArray[subTaskId] = + writtenPointTotalSizeArray[subTaskId] / writtenPointTotalSizeCheckPoint; if (chunkWriter.checkIsChunkSizeOverThreshold(targetChunkSize, targetChunkPointNum, false)) { sealChunk(fileWriter, chunkWriter, subTaskId); - lastCheckIndexArray[subTaskId] = 0; } } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java index 61cec47d39a..ac4120614de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCrossCompactionWriter.java @@ -150,8 +150,7 @@ public abstract class AbstractCrossCompactionWriter extends AbstractCompactionWr checkTimeAndMayFlushChunkToCurrentFile(timestamp, subTaskId); int fileIndex = seqFileIndexArray[subTaskId]; - writeDataPoint(timestamp, value, chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId]++; + writeDataPoint(timestamp, value, chunkWriters[subTaskId], subTaskId); checkChunkSizeAndMayOpenANewChunk( targetFileWriters.get(fileIndex), chunkWriters[subTaskId], subTaskId); isDeviceExistedInTargetFiles[fileIndex] = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java index 0ace4d5cb97..c59c658708c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractInnerCompactionWriter.java @@ -142,8 +142,8 @@ public abstract class AbstractInnerCompactionWriter extends AbstractCompactionWr @Override public void write(TimeValuePair timeValuePair, int subTaskId) throws IOException { checkPreviousTimestamp(timeValuePair.getTimestamp(), subTaskId); - writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId]++; + writeDataPoint( + timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId); checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); lastTime[subTaskId] = timeValuePair.getTimestamp(); lastTimeSet[subTaskId] = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java index 6810df4d1a3..65b955aa757 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointCrossCompactionWriter.java @@ -68,13 +68,16 @@ public class ReadPointCrossCompactionWriter extends AbstractCrossCompactionWrite checkTimeAndMayFlushChunkToCurrentFile(timestamps.getStartTime(), subTaskId); AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); + chunkPointNumArray[subTaskId] += batchSize; + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock); + } synchronized (this) { // we need to synchronized here to avoid multi-thread competition in sub-task TsFileResource resource = targetResources.get(seqFileIndexArray[subTaskId]); resource.updateStartTime(deviceId, timestamps.getStartTime()); resource.updateEndTime(deviceId, timestamps.getEndTime()); } - chunkPointNumArray[subTaskId] += timestamps.getTimes().length; checkChunkSizeAndMayOpenANewChunk( targetFileWriters.get(seqFileIndexArray[subTaskId]), chunkWriter, subTaskId); isDeviceExistedInTargetFiles[seqFileIndexArray[subTaskId]] = true; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java index 5530a1995ce..ec266f9cde8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/ReadPointInnerCompactionWriter.java @@ -71,7 +71,10 @@ public class ReadPointInnerCompactionWriter extends AbstractInnerCompactionWrite int batchSize = tsBlock.getPositionCount(); AlignedChunkWriterImpl chunkWriter = (AlignedChunkWriterImpl) this.chunkWriters[subTaskId]; chunkWriter.write(timestamps, columns, batchSize); - chunkPointNumArray[subTaskId] += timestamps.getTimes().length; + chunkPointNumArray[subTaskId] += batchSize; + if (hasVariableLengthTypeArray[subTaskId]) { + writtenPointTotalSizeArray[subTaskId] += estimateWrittenPointTotalSize(tsBlock); + } checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriter, subTaskId); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java index 15d59be8d68..63e8a58a440 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/RepairUnsortedFileCompactionWriter.java @@ -84,8 +84,8 @@ public class RepairUnsortedFileCompactionWriter extends ReadPointInnerCompaction } private void writeToChunkWriter(TimeValuePair timeValuePair, int subTaskId) throws IOException { - writeDataPoint(timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId]); - chunkPointNumArray[subTaskId]++; + writeDataPoint( + timeValuePair.getTimestamp(), timeValuePair.getValue(), chunkWriters[subTaskId], subTaskId); checkChunkSizeAndMayOpenANewChunk(fileWriter, chunkWriters[subTaskId], subTaskId); } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java new file mode 100644 index 00000000000..de28c629215 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/compaction/execute/utils/writer/AbstractCompactionWriterTest.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer; + +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.AlignedPageElement; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.executor.fast.element.ChunkMetadataElement; +import org.apache.iotdb.db.storageengine.dataregion.compaction.execute.utils.writer.flushcontroller.AbstractCompactionFlushController; + +import org.apache.tsfile.encrypt.EncryptParameter; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.exception.write.PageException; +import org.apache.tsfile.file.header.PageHeader; +import org.apache.tsfile.file.metadata.ChunkMetadata; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.statistics.Statistics; +import org.apache.tsfile.read.TimeValuePair; +import org.apache.tsfile.read.common.Chunk; +import org.apache.tsfile.read.common.block.TsBlock; +import org.apache.tsfile.write.chunk.ChunkWriterImpl; +import org.apache.tsfile.write.schema.MeasurementSchema; +import org.apache.tsfile.write.schema.Schema; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +public class AbstractCompactionWriterTest { + + private static final int SUB_TASK_ID = 0; + + @Test + public void testBinarySizeCheckpointTriggersChunkSizeCheckBeforePointCheckpoint() + throws IOException, PageException { + TestCompactionWriter compactionWriter = new TestCompactionWriter(); + CountingChunkWriter chunkWriter = new CountingChunkWriter(); + PageHeader pageHeader = + createPageHeader(compactionWriter.getCompressedSizeToReachSizeCheckpoint()); + + compactionWriter.startMeasurement("s1", chunkWriter, SUB_TASK_ID); + compactionWriter.flushNonAlignedPageToChunkWriter( + chunkWriter, ByteBuffer.allocate(0), pageHeader, SUB_TASK_ID); + compactionWriter.checkChunkSizeAndMayOpenANewChunk(null, chunkWriter, SUB_TASK_ID); + + Assert.assertEquals(1, chunkWriter.chunkSizeCheckCount); + } + + private static PageHeader createPageHeader(int compressedSize) { + Statistics<?> statistics = Statistics.getStatsByType(TSDataType.TEXT); + return new PageHeader(compressedSize, compressedSize, statistics); + } + + private static class CountingChunkWriter extends ChunkWriterImpl { + + private int chunkSizeCheckCount; + + private CountingChunkWriter() { + super(new MeasurementSchema("s1", TSDataType.TEXT)); + } + + @Override + public boolean checkIsChunkSizeOverThreshold( + long size, long pointNum, boolean returnTrueIfChunkEmpty) { + chunkSizeCheckCount++; + return false; + } + } + + private static class TestCompactionWriter extends AbstractCompactionWriter { + + private int getCompressedSizeToReachSizeCheckpoint() { + return (int) Math.max(targetChunkSize / 10, 1L); + } + + @Override + public void startChunkGroup(IDeviceID deviceId, boolean isAlign) {} + + @Override + public void endChunkGroup() {} + + @Override + public void endMeasurement(int subTaskId) {} + + @Override + public void write(TimeValuePair timeValuePair, int subTaskId) {} + + @Override + public void write(TsBlock tsBlock, int subTaskId) {} + + @Override + public void endFile() {} + + @Override + public long getWriterSize() { + return 0; + } + + @Override + public void checkAndMayFlushChunkMetadata() {} + + @Override + public EncryptParameter getEncryptParameter() { + return null; + } + + @Override + public boolean flushNonAlignedChunk(Chunk chunk, ChunkMetadata chunkMetadata, int subTaskId) { + return false; + } + + @Override + public boolean flushAlignedChunk(ChunkMetadataElement chunkMetadataElement, int subTaskId) { + return false; + } + + @Override + public boolean flushBatchedValueChunk( + ChunkMetadataElement chunkMetadataElement, + int subTaskId, + AbstractCompactionFlushController flushController) { + return false; + } + + @Override + public boolean flushNonAlignedPage( + ByteBuffer compressedPageData, PageHeader pageHeader, int subTaskId) { + return false; + } + + @Override + public boolean flushAlignedPage(AlignedPageElement alignedPageElement, int subTaskId) { + return false; + } + + @Override + public boolean flushBatchedValuePage( + AlignedPageElement alignedPageElement, + int subTaskId, + AbstractCompactionFlushController flushController) { + return false; + } + + @Override + public void setSchemaForAllTargetFile(List<Schema> schemas) {} + + @Override + public void close() {} + } +}
