This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 5905c4f9 support separated aligned chunk (#151)
5905c4f9 is described below

commit 5905c4f98d75b018ac2b4af32efece8518a0f08e
Author: shuwenwei <[email protected]>
AuthorDate: Fri Jul 5 17:51:21 2024 +0800

    support separated aligned chunk (#151)
---
 .../java/org/apache/tsfile/TsFileSequenceRead.java | 76 ++++++++++++++++------
 .../apache/tsfile/read/TsFileSequenceReader.java   | 30 +++++++--
 .../tsfile/read/reader/page/AlignedPageReader.java |  8 +++
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java | 14 ++--
 .../apache/tsfile/write/chunk/TimeChunkWriter.java | 14 ++--
 5 files changed, 104 insertions(+), 38 deletions(-)

diff --git 
a/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java 
b/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
index 5e731479..5e708f1f 100644
--- a/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
+++ b/java/examples/src/main/java/org/apache/tsfile/TsFileSequenceRead.java
@@ -35,9 +35,8 @@ import org.apache.tsfile.fileSystem.FSFactoryProducer;
 import org.apache.tsfile.read.TsFileSequenceReader;
 import org.apache.tsfile.read.common.BatchData;
 import org.apache.tsfile.read.reader.page.PageReader;
-import org.apache.tsfile.read.reader.page.TimePageReader;
-import org.apache.tsfile.read.reader.page.ValuePageReader;
-import org.apache.tsfile.utils.TsPrimitiveType;
+import org.apache.tsfile.utils.ReadWriteIOUtils;
+import org.apache.tsfile.write.UnSupportedDataTypeException;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -50,6 +49,7 @@ public class TsFileSequenceRead {
   // if you wanna print detailed datas in pages, then turn it true.
   private static boolean printDetail = false;
   public static final String POINT_IN_PAGE = "\t\tpoints in the page: ";
+  private static int MASK = 0x80;
 
   @SuppressWarnings({
     "squid:S3776",
@@ -123,30 +123,64 @@ public class TsFileSequenceRead {
                   "\t\tCompressed page data size: " + 
pageHeader.getCompressedSize());
               if ((header.getChunkType() & TsFileConstant.TIME_COLUMN_MASK)
                   == TsFileConstant.TIME_COLUMN_MASK) { // Time Chunk
-                TimePageReader timePageReader =
-                    new TimePageReader(pageHeader, pageData, 
defaultTimeDecoder);
-                timeBatch.add(timePageReader.getNextTimeBatch());
-                System.out.println(POINT_IN_PAGE + 
timeBatch.get(pageIndex).length);
-                if (printDetail) {
-                  for (int i = 0; i < timeBatch.get(pageIndex).length; i++) {
-                    System.out.println("\t\t\ttime: " + 
timeBatch.get(pageIndex)[i]);
+                Decoder decoder =
+                    Decoder.getDecoderByType(header.getEncodingType(), 
header.getDataType());
+                while (decoder.hasNext(pageData)) {
+                  long currentTime = decoder.readLong(pageData);
+                  if (printDetail) {
+                    System.out.println("\t\t\ttime: " + currentTime);
                   }
                 }
               } else if ((header.getChunkType() & 
TsFileConstant.VALUE_COLUMN_MASK)
                   == TsFileConstant.VALUE_COLUMN_MASK) { // Value Chunk
-                ValuePageReader valuePageReader =
-                    new ValuePageReader(pageHeader, pageData, 
header.getDataType(), valueDecoder);
-                TsPrimitiveType[] valueBatch =
-                    valuePageReader.nextValueBatch(timeBatch.get(pageIndex));
-                if (valueBatch.length == 0) {
-                  System.out.println("\t\t-- Empty Page ");
-                } else {
-                  System.out.println(POINT_IN_PAGE + valueBatch.length);
+                int pointNum = 0;
+                byte[] bitmap = null;
+                if (pageData.hasRemaining()) {
+                  int size = ReadWriteIOUtils.readInt(pageData);
+                  bitmap = new byte[(size + 7) / 8];
+                  pageData.get(bitmap);
                 }
-                if (printDetail) {
-                  for (TsPrimitiveType batch : valueBatch) {
-                    System.out.println("\t\t\tvalue: " + batch);
+                while (valueDecoder.hasNext(pageData)) {
+                  pointNum++;
+                  int idx = pointNum - 1;
+                  if (((bitmap[idx / 8] & 0xFF) & (MASK >>> (idx % 8))) == 0) {
+                    if (printDetail) {
+                      System.out.println("\t\t\tvalue: " + null);
+                    }
+                    continue;
+                  }
+                  Object value;
+                  switch (header.getDataType()) {
+                    case BOOLEAN:
+                      value = valueDecoder.readBoolean(pageData);
+                      break;
+                    case INT32:
+                      value = valueDecoder.readInt(pageData);
+                      break;
+                    case INT64:
+                      value = valueDecoder.readLong(pageData);
+                      break;
+                    case FLOAT:
+                      value = valueDecoder.readFloat(pageData);
+                      break;
+                    case DOUBLE:
+                      value = valueDecoder.readDouble(pageData);
+                      break;
+                    case TEXT:
+                      value = valueDecoder.readBinary(pageData);
+                      break;
+                    default:
+                      throw new 
UnSupportedDataTypeException(String.valueOf(header.getDataType()));
                   }
+                  if (printDetail) {
+                    System.out.println("\t\t\tvalue: " + value);
+                  }
+                }
+                pageData.flip();
+                if (pointNum == 0) {
+                  System.out.println("\t\t-- Empty Page ");
+                } else {
+                  System.out.println(POINT_IN_PAGE + pointNum);
                 }
               } else { // NonAligned Chunk
                 PageReader pageReader =
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index 7107242e..fa9b8ab3 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -1755,6 +1755,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
     List<long[]> timeBatch = new ArrayList<>();
     IDeviceID lastDeviceId = null;
     List<IMeasurementSchema> measurementSchemaList = new ArrayList<>();
+    Map<String, Integer> valueColumn2TimeBatchIndex = new HashMap<>();
     try {
       while ((marker = this.readMarker()) != MetaMarker.SEPARATOR) {
         switch (marker) {
@@ -1778,17 +1779,24 @@ public class TsFileSequenceReader implements 
AutoCloseable {
                     chunkHeader.getCompressionType());
             measurementSchemaList.add(measurementSchema);
             dataType = chunkHeader.getDataType();
-            if (chunkHeader.getDataType() == TSDataType.VECTOR) {
-              timeBatch.clear();
-            }
+
             Statistics<? extends Serializable> chunkStatistics =
                 Statistics.getStatsByType(dataType);
             int dataSize = chunkHeader.getDataSize();
 
             if (dataSize > 0) {
+              if (marker == MetaMarker.TIME_CHUNK_HEADER) {
+                timeBatch.add(null);
+              }
               if (((byte) (chunkHeader.getChunkType() & 0x3F))
                   == MetaMarker
                       .CHUNK_HEADER) { // more than one page, we could use 
page statistics to
+                if (marker == MetaMarker.VALUE_CHUNK_HEADER) {
+                  int timeBatchIndex =
+                      
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+                  valueColumn2TimeBatchIndex.put(
+                      chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+                }
                 // generate chunk statistic
                 while (dataSize > 0) {
                   // a new Page
@@ -1830,7 +1838,12 @@ public class TsFileSequenceReader implements 
AutoCloseable {
                   ValuePageReader valuePageReader =
                       new ValuePageReader(
                           pageHeader, pageData, chunkHeader.getDataType(), 
valueDecoder);
-                  TsPrimitiveType[] valueBatch = 
valuePageReader.nextValueBatch(timeBatch.get(0));
+                  int timeBatchIndex =
+                      
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+                  valueColumn2TimeBatchIndex.put(
+                      chunkHeader.getMeasurementID(), timeBatchIndex + 1);
+                  TsPrimitiveType[] valueBatch =
+                      
valuePageReader.nextValueBatch(timeBatch.get(timeBatchIndex));
 
                   if (valueBatch != null && valueBatch.length != 0) {
                     for (int i = 0; i < valueBatch.length; i++) {
@@ -1838,7 +1851,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
                       if (value == null) {
                         continue;
                       }
-                      long timeStamp = timeBatch.get(0)[i];
+                      long timeStamp = timeBatch.get(timeBatchIndex)[i];
                       switch (dataType) {
                         case INT32:
                         case DATE:
@@ -1909,6 +1922,11 @@ public class TsFileSequenceReader implements 
AutoCloseable {
                 }
                 chunkHeader.increasePageNums(1);
               }
+            } else if (marker == MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER
+                || marker == MetaMarker.VALUE_CHUNK_HEADER) {
+              int timeBatchIndex =
+                  
valueColumn2TimeBatchIndex.getOrDefault(chunkHeader.getMeasurementID(), 0);
+              valueColumn2TimeBatchIndex.put(chunkHeader.getMeasurementID(), 
timeBatchIndex + 1);
             }
             currentChunk =
                 new ChunkMetadata(measurementID, dataType, fileOffsetOfChunk, 
chunkStatistics);
@@ -1934,6 +1952,8 @@ public class TsFileSequenceReader implements 
AutoCloseable {
             chunkMetadataList = new ArrayList<>();
             ChunkGroupHeader chunkGroupHeader = this.readChunkGroupHeader();
             lastDeviceId = chunkGroupHeader.getDeviceID();
+            timeBatch.clear();
+            valueColumn2TimeBatchIndex.clear();
             break;
           case MetaMarker.OPERATION_INDEX_RANGE:
             truncatedSize = this.position() - 1;
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
index 75f664f5..5307ed35 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/page/AlignedPageReader.java
@@ -469,4 +469,12 @@ public class AlignedPageReader implements IPageReader {
       builder = new TsBlockBuilder((int) 
timePageReader.getStatistics().getCount(), dataTypes);
     }
   }
+
+  public TimePageReader getTimePageReader() {
+    return timePageReader;
+  }
+
+  public List<ValuePageReader> getValuePageReaderList() {
+    return valuePageReaderList;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index d5fddb83..22d310c2 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -42,12 +42,14 @@ import java.util.List;
 
 public class AlignedChunkWriterImpl implements IChunkWriter {
 
-  private final TimeChunkWriter timeChunkWriter;
-  private final List<ValueChunkWriter> valueChunkWriterList;
-  private int valueIndex;
+  protected TimeChunkWriter timeChunkWriter;
+  protected List<ValueChunkWriter> valueChunkWriterList;
+  protected int valueIndex;
 
   // Used for batch writing
-  private long remainingPointsNumber;
+  protected long remainingPointsNumber;
+
+  protected AlignedChunkWriterImpl() {}
 
   // TestOnly
   public AlignedChunkWriterImpl(VectorMeasurementSchema schema) {
@@ -330,7 +332,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter 
{
    * check occupied memory size, if it exceeds the PageSize threshold, 
construct a page and put it
    * to pageBuffer
    */
-  private boolean checkPageSizeAndMayOpenANewPage() {
+  protected boolean checkPageSizeAndMayOpenANewPage() {
     if (timeChunkWriter.checkPageSizeAndMayOpenANewPage()) {
       return true;
     }
@@ -342,7 +344,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter 
{
     return false;
   }
 
-  private void writePageToPageBuffer() {
+  protected void writePageToPageBuffer() {
     timeChunkWriter.writePageToPageBuffer();
     for (ValueChunkWriter valueChunkWriter : valueChunkWriterList) {
       valueChunkWriter.writePageToPageBuffer();
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
index b12e2e53..756031fc 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/TimeChunkWriter.java
@@ -47,14 +47,14 @@ public class TimeChunkWriter {
 
   private static final Logger logger = 
LoggerFactory.getLogger(TimeChunkWriter.class);
 
-  private final String measurementId;
+  private String measurementId;
 
-  private final TSEncoding encodingType;
+  private TSEncoding encodingType;
 
-  private final CompressionType compressionType;
+  private CompressionType compressionType;
 
   /** all pages of this chunk. */
-  private final PublicBAOS pageBuffer;
+  private PublicBAOS pageBuffer;
 
   private int numOfPages;
 
@@ -62,9 +62,9 @@ public class TimeChunkWriter {
   private TimePageWriter pageWriter;
 
   /** page size threshold. */
-  private final long pageSizeThreshold;
+  private long pageSizeThreshold;
 
-  private final int maxNumberOfPointsInPage;
+  private int maxNumberOfPointsInPage;
 
   /** value count in current page. */
   private int valueCountInOnePageForNextCheck;
@@ -80,6 +80,8 @@ public class TimeChunkWriter {
 
   private Statistics<?> firstPageStatistics;
 
+  protected TimeChunkWriter() {}
+
   public TimeChunkWriter(
       String measurementId,
       CompressionType compressionType,

Reply via email to