This is an automated email from the ASF dual-hosted git repository. haonan pushed a commit to branch iotdb in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 0ae8ddabf632d6c130d86a51fe5654af0d66b75d Author: shuwenwei <[email protected]> AuthorDate: Fri Jul 5 17:51:21 2024 +0800 support separated aligned chunk (#151) --- .../java/org/apache/tsfile/TsFileSequenceRead.java | 76 ++++++++++++++++------ .../apache/tsfile/read/TsFileSequenceReader.java | 30 +++++++-- .../tsfile/read/reader/page/AlignedPageReader.java | 8 +++ .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 14 ++-- .../apache/tsfile/write/chunk/TimeChunkWriter.java | 14 ++-- 5 files changed, 104 insertions(+), 38 deletions(-) diff --git a/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java b/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java index 5e731479..5e708f1f 100644 --- a/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java +++ b/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java @@ -35,9 +35,8 @@ import org.apache.tsfile.fileSystem.FSFactoryProducer; import org.apache.tsfile.read.TsFileSequenceReader; import org.apache.tsfile.read.common.BatchData; import org.apache.tsfile.read.reader.page.PageReader; -import org.apache.tsfile.read.reader.page.TimePageReader; -import org.apache.tsfile.read.reader.page.ValuePageReader; -import org.apache.tsfile.utils.TsPrimitiveType; +import org.apache.tsfile.utils.ReadWriteIOUtils; +import org.apache.tsfile.write.UnSupportedDataTypeException; import java.io.IOException; import java.nio.ByteBuffer; @@ -50,6 +49,7 @@ public class TsFileSequenceRead { // if you wanna print detailed datas in pages, then turn it true. private static boolean printDetail = false; public static final String POINT_IN_PAGE = "\t\tpoints in the page: "; + private static int MASK = 0x80; @SuppressWarnings({ "squid:S3776", @@ -123,30 +123,64 @@ public class TsFileSequenceRead { "\t\tCompressed page data size: " + pageHeader.getCompressedSize()); if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK) == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk - TimePageReader timePageReader = - new TimePageReader(pageHeader, pageData, defaultTimeDecoder); - timeBatch.add(timePageReader.getNextTimeBatch()); - System.out.println(POINT_IN_PAGE + timeBatch.get(pageIndex).length); - if (printDetail) { - for (int i = 0; i < timeBatch.get(pageIndex).length; i++) { - System.out.println("\t\t\ttime: " + timeBatch.get(pageIndex)[i]); + Decoder decoder = + Decoder.getDecoderByType(header.getEncodingType(), header.getDataType()); + while (decoder.hasNext(pageData)) { + long currentTime = decoder.readLong(pageData); + if (printDetail) { + System.out.println("\t\t\ttime: " + currentTime); } } } else if ((header.getChunkType() & TsFileConstant.VALUE_COLUMN_MASK) == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk - ValuePageReader valuePageReader = - new ValuePageReader(pageHeader, pageData, header.getDataType(), valueDecoder); - TsPrimitiveType[] valueBatch = - valuePageReader.nextValueBatch(timeBatch.get(pageIndex)); - if (valueBatch.length == 0) { - System.out.println("\t\t-- Empty Page "); - } else { - System.out.println(POINT_IN_PAGE + valueBatch.length); + int pointNum = 0; + byte[] bitmap = null; + if (pageData.hasRemaining()) { + int size = ReadWriteIOUtils.readInt(pageData); + bitmap = new byte[(size + 7) / 8]; + pageData.get(bitmap); } - if (printDetail) { - for (TsPrimitiveType batch : valueBatch) { - System.out.println("\t\t\tvalue: " + batch); + while (valueDecoder.hasNext(pageData)) { + pointNum++; + int idx = pointNum - 1; + if (((bitmap[idx / 8] & 0xFF) & (MASK >>> (idx % 8))) == 0) { + if (printDetail) { + System.out.println("\t\t\tvalue: " + null); + } + continue; + } + Object value; + switch (header.getDataType()) { + case BOOLEAN: + value = valueDecoder.readBoolean(pageData); + break; + case INT32: + value = valueDecoder.readInt(pageData); + break; + case INT64: + value = valueDecoder.readLong(pageData); + break; + case FLOAT: + value = valueDecoder.readFloat(pageData); + break; + case DOUBLE: + value = valueDecoder.readDouble(pageData); + break; + case TEXT: + value = valueDecoder.readBinary(pageData); + break; + default: + throw new UnSupportedDataTypeException(String.valueOf(header.getDataType())); } + if (printDetail) { + System.out.println("\t\t\tvalue: " + value); + } + } + pageData.flip(); + if (pointNum == 0) { + System.out.println("\t\t-- Empty Page "); + } else { + System.out.println(POINT_IN_PAGE + pointNum); } } else { // NonAligned Chunk PageReader pageReader = diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java index 7107242e..fa9b8ab3 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java @@ -1755,6 +1755,7 @@ public class TsFileSequenceReader implements AutoCloseable { List<long[]> timeBatch = new ArrayList<>(); IDeviceID lastDeviceId = null; List<IMeasurementSchema> measurementSchemaList = new ArrayList<>(); + Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>(); try { while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) { switch (marker) { @@ -1778,17 +1779,24 @@ public class TsFileSequenceReader implements AutoCloseable { chunkHeader.getCompressionType()); measurementSchemaList.add(measurementSchema); dataType = chunkHeader.getDataType(); - if (chunkHeader.getDataType() == TSDataType.VECTOR) { - timeBatch.clear(); - } + Statistics<? extends Serializable> chunkStatistics = Statistics.getStatsByType(dataType); int dataSize = chunkHeader.getDataSize(); if (dataSize > 0) { + if (marker == MetaMarker.TIME_CHUNK_HEADER) { + timeBatch.add(null); + } if (((byte) (chunkHeader.getChunkType() & 0x3F)) == MetaMarker .CHUNK_HEADER) { // more than one page, we could use page statistics to + if (marker == MetaMarker.VALUE_CHUNK_HEADER) { + int timeBatchIndex = + valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0); + valueColumn2TimeBatchIndex.put( + chunkHeader.getMeasurementID(), timeBatchIndex + 1); + } // generate chunk statistic while (dataSize > 0) { // a new Page @@ -1830,7 +1838,12 @@ public class TsFileSequenceReader implements AutoCloseable { ValuePageReader valuePageReader = new ValuePageReader( pageHeader, pageData, chunkHeader.getDataType(), valueDecoder); - TsPrimitiveType[] valueBatch = valuePageReader.nextValueBatch(timeBatch.get(0)); + int timeBatchIndex = + valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0); + valueColumn2TimeBatchIndex.put( + chunkHeader.getMeasurementID(), timeBatchIndex + 1); + TsPrimitiveType[] valueBatch = + valuePageReader.nextValueBatch(timeBatch.get(timeBatchIndex)); if (valueBatch != null && valueBatch.length != 0) { for (int i = 0; i < valueBatch.length; i++) { @@ -1838,7 +1851,7 @@ public class TsFileSequenceReader implements AutoCloseable { if (value == null) { continue; } - long timeStamp = timeBatch.get(0)[i]; + long timeStamp = timeBatch.get(timeBatchIndex)[i]; switch (dataType) { case INT32: case DATE: @@ -1909,6 +1922,11 @@ public class TsFileSequenceReader implements AutoCloseable { } chunkHeader.increasePageNums(1); } + } else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER + || marker == MetaMarker.VALUE_CHUNK_HEADER) { + int timeBatchIndex = + valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0); + valueColumn2TimeBatchIndex.put(chunkHeader.getMeasurementID(), timeBatchIndex + 1); } currentChunk = new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, chunkStatistics); @@ -1934,6 +1952,8 @@ public class TsFileSequenceReader implements AutoCloseable { chunkMetadataList = new ArrayList<>(); ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader(); lastDeviceId = chunkGroupHeader.getDeviceID(); + timeBatch.clear(); + valueColumn2TimeBatchIndex.clear(); break; case MetaMarker.OPERATION_INDEX_RANGE: truncatedSize = this.position() - 1; diff --git a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java index 75f664f5..5307ed35 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java @@ -469,4 +469,12 @@ public class AlignedPageReader implements IPageReader { builder = new TsBlockBuilder((int) timePageReader.getStatistics().getCount(), dataTypes); } } + + public TimePageReader getTimePageReader() { + return timePageReader; + } + + public List<ValuePageReader> getValuePageReaderList() { + return valuePageReaderList; + } } diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java index d5fddb83..22d310c2 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java @@ -42,12 +42,14 @@ import java.util.List; public class AlignedChunkWriterImpl implements IChunkWriter { - private final TimeChunkWriter timeChunkWriter; - private final List<ValueChunkWriter> valueChunkWriterList; - private int valueIndex; + protected TimeChunkWriter timeChunkWriter; + protected List<ValueChunkWriter> valueChunkWriterList; + protected int valueIndex; // Used for batch writing - private long remainingPointsNumber; + protected long remainingPointsNumber; + + protected AlignedChunkWriterImpl() {} // TestOnly public AlignedChunkWriterImpl(VectorMeasurementSchema schema) { @@ -330,7 +332,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter { * check occupied memory size, if it exceeds the PageSize threshold, construct a page and put it * to pageBuffer */ - private boolean checkPageSizeAndMayOpenANewPage() { + protected boolean checkPageSizeAndMayOpenANewPage() { if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) { return true; } @@ -342,7 +344,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter { return false; } - private void writePageToPageBuffer() { + protected void writePageToPageBuffer() { timeChunkWriter.writePageToPageBuffer(); for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) { valueChunkWriter.writePageToPageBuffer(); diff --git a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java index b12e2e53..756031fc 100644 --- a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java +++ b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java @@ -47,14 +47,14 @@ public class TimeChunkWriter { private static final Logger logger = LoggerFactory.getLogger(TimeChunkWriter.class); - private final String measurementId; + private String measurementId; - private final TSEncoding encodingType; + private TSEncoding encodingType; - private final CompressionType compressionType; + private CompressionType compressionType; /** all pages of this chunk. */ - private final PublicBAOS pageBuffer; + private PublicBAOS pageBuffer; private int numOfPages; @@ -62,9 +62,9 @@ public class TimeChunkWriter { private TimePageWriter pageWriter; /** page size threshold. */ - private final long pageSizeThreshold; + private long pageSizeThreshold; - private final int maxNumberOfPointsInPage; + private int maxNumberOfPointsInPage; /** value count in current page. */ private int valueCountInOnePageForNextCheck; @@ -80,6 +80,8 @@ public class TimeChunkWriter { private Statistics<?> firstPageStatistics; + protected TimeChunkWriter() {} + public TimeChunkWriter( String measurementId, CompressionType compressionType,
