This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch support_set_compression_by_type_1.3
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit 85648499afa9eaf00af953ab6262b575e4091af6
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jun 24 19:05:56 2025 +0800

    Support set default compression by data type (#523)
    
    (cherry picked from commit de166a7d8bd1a5b3a6bdf4749362d583dfb1a9ef)
---
 .../apache/tsfile/common/conf/TSFileConfig.java    | 120 ++++++
 .../tsfile/common/conf/TSFileDescriptor.java       |   6 +
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |   7 +-
 .../tsfile/write/schema/MeasurementSchema.java     |   6 +-
 .../tsfile/write/schema/TimeseriesSchema.java      |   6 +-
 .../write/schema/VectorMeasurementSchema.java      | 107 +++++-
 .../apache/tsfile/write/writer/TsFileIOWriter.java |   4 +
 .../org/apache/tsfile/write/ChunkRewriteTest.java  | 425 +++++++++++++++++++++
 .../apache/tsfile/write/TsFileIOWriterTest.java    |  44 ++-
 9 files changed, 684 insertions(+), 41 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java 
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
index 72cf59be..a7ef6b6e 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
@@ -102,6 +102,42 @@ public class TSFileConfig implements Serializable {
    */
   private String timeEncoding = "TS_2DIFF";
 
+  /** Encoder of boolean column. Default value is RLE. */
+  private String booleanEncoding = "RLE";
+
+  /** Encoder of int32 and date column. Default value is TS_2DIFF. */
+  private String int32Encoding = "TS_2DIFF";
+
+  /** Encoder of int64 and timestamp column. Default value is TS_2DIFF. */
+  private String int64Encoding = "TS_2DIFF";
+
+  /** Encoder of float column. Default value is GORILLA. */
+  private String floatEncoding = "GORILLA";
+
+  /** Encoder of double column. Default value is GORILLA. */
+  private String doubleEncoding = "GORILLA";
+
+  /** Encoder of string, blob and text column. Default value is PLAIN. */
+  private String textEncoding = "PLAIN";
+
+  /** Compression of boolean column. Defaults to the overall compression. */
+  private String booleanCompression = null;
+
+  /** Compression of int32 and date column. Defaults to the overall 
compression. */
+  private String int32Compression = null;
+
+  /** Compression of int64 and timestamp column. Defaults to the overall 
compression. */
+  private String int64Compression = null;
+
+  /** Compression of float column. Defaults to the overall compression. */
+  private String floatCompression = null;
+
+  /** Compression of double column. Defaults to the overall compression. */
+  private String doubleCompression = null;
+
+  /** Compression of string, blob and text column. Defaults to the overall 
compression. */
+  private String textCompression = null;
+
   /**
    * Encoder of value series. default value is PLAIN. For int, long data type, 
TsFile also supports
    * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, 
double data type, TsFile
@@ -288,6 +324,66 @@ public class TSFileConfig implements Serializable {
     return valueEncoder;
   }
 
+  public String getValueEncoder(TSDataType dataType) {
+    switch (dataType) {
+      case BOOLEAN:
+        return booleanEncoding;
+      case INT32:
+      case DATE:
+        return int32Encoding;
+      case INT64:
+      case TIMESTAMP:
+        return int64Encoding;
+      case FLOAT:
+        return floatEncoding;
+      case DOUBLE:
+        return doubleEncoding;
+      case STRING:
+      case BLOB:
+      case TEXT:
+      default:
+        return textEncoding;
+    }
+  }
+
+  public CompressionType getCompressor(TSDataType dataType) {
+    String compressionName;
+    switch (dataType) {
+      case BOOLEAN:
+        compressionName = booleanCompression;
+        break;
+      case INT32:
+      case DATE:
+        compressionName = int32Compression;
+        break;
+      case INT64:
+      case TIMESTAMP:
+        compressionName = int64Compression;
+        break;
+      case FLOAT:
+        compressionName = floatCompression;
+        break;
+      case DOUBLE:
+        compressionName = doubleCompression;
+        break;
+      case STRING:
+      case BLOB:
+      case TEXT:
+        compressionName = textCompression;
+        break;
+      default:
+        compressionName = null;
+    }
+
+    CompressionType compressionType;
+    if (compressionName != null) {
+      compressionType = CompressionType.valueOf(compressionName);
+    } else {
+      compressionType = compressor;
+    }
+    return compressionType;
+  }
+
   public void setValueEncoder(String valueEncoder) {
     this.valueEncoder = valueEncoder;
   }
@@ -568,4 +664,28 @@ public class TSFileConfig implements Serializable {
   public void setLz4UseJni(boolean lz4UseJni) {
     this.lz4UseJni = lz4UseJni;
   }
+
+  public void setBooleanCompression(String booleanCompression) {
+    this.booleanCompression = booleanCompression;
+  }
+
+  public void setInt32Compression(String int32Compression) {
+    this.int32Compression = int32Compression;
+  }
+
+  public void setInt64Compression(String int64Compression) {
+    this.int64Compression = int64Compression;
+  }
+
+  public void setFloatCompression(String floatCompression) {
+    this.floatCompression = floatCompression;
+  }
+
+  public void setDoubleCompression(String doubleCompression) {
+    this.doubleCompression = doubleCompression;
+  }
+
+  public void setTextCompression(String textCompression) {
+    this.textCompression = textCompression;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java 
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
index 8f2e2c78..d00220fd 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
@@ -81,6 +81,12 @@ public class TSFileDescriptor {
     writer.setInt(conf::setFloatPrecision, "float_precision");
     writer.setString(conf::setValueEncoder, "value_encoder");
     writer.setString(conf::setCompressor, "compressor");
+    writer.setString(conf::setBooleanCompression, "boolean_compressor");
+    writer.setString(conf::setInt32Compression, "int32_compressor");
+    writer.setString(conf::setInt64Compression, "int64_compressor");
+    writer.setString(conf::setFloatCompression, "float_compressor");
+    writer.setString(conf::setDoubleCompression, "double_compressor");
+    writer.setString(conf::setTextCompression, "text_compressor");
     writer.setInt(conf::setBatchSize, "batch_size");
     writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni");
   }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 22d310c2..515f8e6e 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -56,7 +56,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
     timeChunkWriter =
         new TimeChunkWriter(
             schema.getMeasurementId(),
-            schema.getCompressor(),
+            schema.getTimeCompressor(),
             schema.getTimeTSEncoding(),
             schema.getTimeEncoder());
 
@@ -70,7 +70,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
       valueChunkWriterList.add(
           new ValueChunkWriter(
               valueMeasurementIdList.get(i),
-              schema.getCompressor(),
+              schema.getValueCompressor(i),
               valueTSDataTypeList.get(i),
               valueTSEncodingList.get(i),
               valueEncoderList.get(i)));
@@ -122,7 +122,8 @@ public class AlignedChunkWriterImpl implements IChunkWriter 
{
     TSEncoding timeEncoding =
         
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
     TSDataType timeType = 
TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
-    CompressionType timeCompression = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    CompressionType timeCompression =
+        
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64);
     timeChunkWriter =
         new TimeChunkWriter(
             "",
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
index 545d9cc2..b3f20ac4 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
@@ -63,8 +63,8 @@ public class MeasurementSchema
     this(
         measurementId,
         tsDataType,
-        
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType),
         null);
   }
 
@@ -74,7 +74,7 @@ public class MeasurementSchema
         measurementId,
         type,
         encoding,
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(type),
         null);
   }
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
index 0c287e85..21c74265 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
@@ -55,8 +55,8 @@ public class TimeseriesSchema implements 
Comparable<TimeseriesSchema>, Serializa
     this(
         fullPath,
         tsDataType,
-        
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder()),
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType),
         Collections.emptyMap());
   }
 
@@ -66,7 +66,7 @@ public class TimeseriesSchema implements 
Comparable<TimeseriesSchema>, Serializa
         fullPath,
         type,
         encoding,
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(type),
         Collections.emptyMap());
   }
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
index 9b157540..33c21479 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
@@ -47,13 +47,19 @@ public class VectorMeasurementSchema
       RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class);
   private static final long BUILDER_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class);
+  private static final byte NO_UNIFIED_COMPRESSOR = -1;
 
   private String deviceId;
   private Map<String, Integer> measurementsToIndexMap;
   private byte[] types;
   private byte[] encodings;
   private TSEncodingBuilder[] encodingConverters;
-  private byte compressor;
+
+  /** For compatibility of old versions. */
+  private byte unifiedCompressor;
+
+  /** [0] is for the time column. */
+  private byte[] compressors;
 
   public VectorMeasurementSchema() {}
 
@@ -80,7 +86,34 @@ public class VectorMeasurementSchema
     }
     this.encodings = encodingsInByte;
     this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
-    this.compressor = compressionType.serialize();
+    this.unifiedCompressor = compressionType.serialize();
+  }
+
+  public VectorMeasurementSchema(
+      String deviceId,
+      String[] subMeasurements,
+      TSDataType[] types,
+      TSEncoding[] encodings,
+      byte[] compressors) {
+    this.deviceId = deviceId;
+    this.measurementsToIndexMap = new HashMap<>();
+    for (int i = 0; i < subMeasurements.length; i++) {
+      measurementsToIndexMap.put(subMeasurements[i], i);
+    }
+    byte[] typesInByte = new byte[types.length];
+    for (int i = 0; i < types.length; i++) {
+      typesInByte[i] = types[i].serialize();
+    }
+    this.types = typesInByte;
+
+    byte[] encodingsInByte = new byte[encodings.length];
+    for (int i = 0; i < encodings.length; i++) {
+      encodingsInByte[i] = encodings[i].serialize();
+    }
+    this.encodings = encodingsInByte;
+    this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
+    this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
+    this.compressors = compressors;
   }
 
   public VectorMeasurementSchema(String deviceId, String[] subMeasurements, 
TSDataType[] types) {
@@ -101,7 +134,15 @@ public class VectorMeasurementSchema
               .serialize();
     }
     this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
-    this.compressor = 
TSFileDescriptor.getInstance().getConfig().getCompressor().serialize();
+    this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
+    // the first column is time
+    this.compressors = new byte[subMeasurements.length + 1];
+    compressors[0] =
+        
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize();
+    for (int i = 0; i < types.length; i++) {
+      compressors[i + 1] =
+          
TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize();
+    }
   }
 
   public VectorMeasurementSchema(
@@ -124,9 +165,24 @@ public class VectorMeasurementSchema
     return deviceId;
   }
 
+  @Deprecated // Aligned series should not invoke this method
   @Override
   public CompressionType getCompressor() {
-    return CompressionType.deserialize(compressor);
+    throw new UnsupportedOperationException("Aligned series should not invoke 
this method");
+  }
+
+  public CompressionType getTimeCompressor() {
+    if (compressors != null) {
+      return CompressionType.deserialize(compressors[0]);
+    }
+    return CompressionType.deserialize(unifiedCompressor);
+  }
+
+  public CompressionType getValueCompressor(int index) {
+    if (compressors != null) {
+      return CompressionType.deserialize(compressors[index + 1]);
+    }
+    return CompressionType.deserialize(unifiedCompressor);
   }
 
   @Override
@@ -276,7 +332,11 @@ public class VectorMeasurementSchema
     for (byte encoding : encodings) {
       byteLen += ReadWriteIOUtils.write(encoding, buffer);
     }
-    byteLen += ReadWriteIOUtils.write(compressor, buffer);
+    byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer);
+    if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      buffer.put(compressors);
+      byteLen += compressors.length;
+    }
 
     return byteLen;
   }
@@ -297,7 +357,11 @@ public class VectorMeasurementSchema
     for (byte encoding : encodings) {
       byteLen += ReadWriteIOUtils.write(encoding, outputStream);
     }
-    byteLen += ReadWriteIOUtils.write(compressor, outputStream);
+    byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream);
+    if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      outputStream.write(compressors);
+      byteLen += compressors.length;
+    }
 
     return byteLen;
   }
@@ -348,7 +412,15 @@ public class VectorMeasurementSchema
     }
     vectorMeasurementSchema.encodings = encodings;
 
-    vectorMeasurementSchema.compressor = 
ReadWriteIOUtils.readByte(inputStream);
+    vectorMeasurementSchema.unifiedCompressor = 
ReadWriteIOUtils.readByte(inputStream);
+    if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      byte[] compressors = new byte[measurementSize + 1];
+      int read = inputStream.read(compressors);
+      if (read != measurementSize) {
+        throw new IOException("Unexpected end of stream when reading 
compressors");
+      }
+      vectorMeasurementSchema.compressors = compressors;
+    }
     return vectorMeasurementSchema;
   }
 
@@ -375,7 +447,12 @@ public class VectorMeasurementSchema
     }
     vectorMeasurementSchema.encodings = encodings;
 
-    vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer);
+    vectorMeasurementSchema.unifiedCompressor = 
ReadWriteIOUtils.readByte(buffer);
+    if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      byte[] compressors = new byte[measurementSize + 1];
+      buffer.get(compressors);
+      vectorMeasurementSchema.compressors = compressors;
+    }
     return vectorMeasurementSchema;
   }
 
@@ -391,12 +468,13 @@ public class VectorMeasurementSchema
     return Arrays.equals(types, that.types)
         && Arrays.equals(encodings, that.encodings)
         && Objects.equals(deviceId, that.deviceId)
-        && Objects.equals(compressor, that.compressor);
+        && Objects.equals(unifiedCompressor, that.unifiedCompressor)
+        && Objects.equals(compressors, that.compressors);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(deviceId, types, encodings, compressor);
+    return Objects.hash(deviceId, types, encodings, unifiedCompressor, 
compressors);
   }
 
   /** compare by vector name */
@@ -424,7 +502,14 @@ public class VectorMeasurementSchema
           TSEncoding.deserialize(encodings[entry.getValue()]).toString());
       sc.addTail("],");
     }
-    sc.addTail(CompressionType.deserialize(compressor).toString());
+    if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) {
+      sc.addTail(CompressionType.deserialize(unifiedCompressor).toString());
+    } else {
+      for (byte compressor : compressors) {
+        
sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(",");
+      }
+    }
+
     return sc.toString();
   }
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index b7d71972..cc59fd20 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -322,6 +322,10 @@ public class TsFileIOWriter implements AutoCloseable {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void endFile() throws IOException {
+    if (!canWrite) {
+      return;
+    }
+
     checkInMemoryPathCount();
     readChunkMetadataAndConstructIndexTree();
 
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
new file mode 100644
index 00000000..30e36055
--- /dev/null
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tsfile.write;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.IPageReader;
+import org.apache.tsfile.read.reader.IPointReader;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.read.reader.chunk.TableChunkReader;
+import org.apache.tsfile.read.reader.page.AlignedPageReader;
+import org.apache.tsfile.write.chunk.AlignedChunkWriterImpl;
+import org.apache.tsfile.write.chunk.ChunkWriterImpl;
+import org.apache.tsfile.write.chunk.TimeChunkWriter;
+import org.apache.tsfile.write.chunk.ValueChunkWriter;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.apache.tsfile.write.schema.VectorMeasurementSchema;
+
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+
+public class ChunkRewriteTest {
+
+  @Test
+  public void AlignedChunkSinglePageTest() throws IOException {
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, 
TSDataType.DOUBLE};
+    VectorMeasurementSchema measurementSchema =
+        new VectorMeasurementSchema("root.sg.d1", measurements, types);
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+    List<ValueChunkWriter> valueChunkWriters = 
chunkWriter.getValueChunkWriterList();
+
+    Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+
+    List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+    AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    for (IPageReader page : pageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals(i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+    }
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, 
timeChunk);
+    valueChunks.set(1, newValueChunk);
+    AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+    for (IPageReader page : newPageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals((double) i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+      assertEquals(20, i - 1);
+    }
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+
+    //
+
+  }
+
+  @Test
+  public void AlignedChunkMultiPagesTest() throws IOException {
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, 
TSDataType.DOUBLE};
+    VectorMeasurementSchema measurementSchema =
+        new VectorMeasurementSchema("root.sg.d1", measurements, types);
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    for (int time = 21; time <= 40; time++) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+    List<ValueChunkWriter> valueChunkWriters = 
chunkWriter.getValueChunkWriterList();
+
+    Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+    List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+    AlignedChunkReader chunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    int i = 1;
+    for (IPageReader page : pageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals(i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+    }
+    assertEquals(40, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, 
timeChunk);
+    valueChunks.set(1, newValueChunk);
+    AlignedChunkReader newChunkReader = new AlignedChunkReader(timeChunk, 
valueChunks);
+    i = 1;
+    List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+    for (IPageReader page : newPageReaders) {
+      IPointReader pointReader = ((AlignedPageReader) 
page).getLazyPointReader();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((float) i, point.getValue().getVector()[0].getValue());
+        assertEquals((double) i, point.getValue().getVector()[1].getValue());
+        assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        i++;
+      }
+    }
+    assertEquals(40, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+  }
+
+  @Test
+  public void AlignedChunkWithNullTest() throws IOException {
+    String[] measurements = new String[] {"s1", "s2", "s3"};
+    TSDataType[] types = new TSDataType[] {TSDataType.FLOAT, TSDataType.INT32, 
TSDataType.DOUBLE};
+    VectorMeasurementSchema measurementSchema =
+        new VectorMeasurementSchema("root.sg.d1", measurements, types);
+    AlignedChunkWriterImpl chunkWriter = new 
AlignedChunkWriterImpl(measurementSchema);
+
+    for (int time = 1; time <= 30; time = time + 3) {
+      chunkWriter.write(time, (float) time, false);
+      chunkWriter.write(time, time, false);
+      chunkWriter.write(time, (double) time, false);
+      chunkWriter.write(time);
+
+      chunkWriter.write(time + 1, (float) (time + 1), true);
+      chunkWriter.write(time + 1, time + 1, false);
+      chunkWriter.write(time + 1, (double) (time + 1), true);
+      chunkWriter.write(time + 1);
+
+      chunkWriter.write(time + 2, (float) (time + 1), true);
+      chunkWriter.write(time + 2, time + 1, true);
+      chunkWriter.write(time + 2, (double) (time + 1), true);
+      chunkWriter.write(time + 2);
+    }
+    chunkWriter.sealCurrentPage();
+
+    TimeChunkWriter timeChunkWriter = chunkWriter.getTimeChunkWriter();
+    List<ValueChunkWriter> valueChunkWriters = 
chunkWriter.getValueChunkWriterList();
+
+    Chunk timeChunk = getTimeChunk(measurementSchema, timeChunkWriter);
+    List<Chunk> valueChunks = getValueChunks(valueChunkWriters);
+
+    TableChunkReader chunkReader = new TableChunkReader(timeChunk, 
valueChunks, null);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    int i = 1;
+    for (IPageReader page : pageReaders) {
+      IPointReader pointReader = 
page.getAllSatisfiedPageData().getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        if (i % 3 == 1) {
+          assertEquals((long) i, point.getTimestamp());
+          assertEquals((float) i, point.getValue().getVector()[0].getValue());
+          assertEquals(i, point.getValue().getVector()[1].getValue());
+          assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        } else if (i % 3 == 2) {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertEquals(i, point.getValue().getVector()[1].getValue());
+          assertNull(point.getValue().getVector()[2]);
+        } else {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertNull(point.getValue().getVector()[1]);
+          assertNull(point.getValue().getVector()[2]);
+        }
+        i++;
+      }
+    }
+    assertEquals(30, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newValueChunk = valueChunks.get(1).rewrite(TSDataType.DOUBLE, 
timeChunk);
+    valueChunks.set(1, newValueChunk);
+    TableChunkReader newChunkReader = new TableChunkReader(timeChunk, 
valueChunks, null);
+    i = 1;
+    List<IPageReader> newPageReaders = newChunkReader.loadPageReaderList();
+    for (IPageReader page : newPageReaders) {
+      IPointReader pointReader = 
page.getAllSatisfiedPageData().getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        if (i % 3 == 1) {
+          assertEquals((long) i, point.getTimestamp());
+          assertEquals((float) i, point.getValue().getVector()[0].getValue());
+          assertEquals((double) i, point.getValue().getVector()[1].getValue());
+          assertEquals((double) i, point.getValue().getVector()[2].getValue());
+        } else if (i % 3 == 2) {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertEquals((double) i, point.getValue().getVector()[1].getValue());
+          assertNull(point.getValue().getVector()[2]);
+        } else {
+          assertEquals((long) i, point.getTimestamp());
+          assertNull(point.getValue().getVector()[0]);
+          assertNull(point.getValue().getVector()[1]);
+          assertNull(point.getValue().getVector()[2]);
+        }
+        i++;
+      }
+    }
+    assertEquals(30, i - 1);
+    timeChunk.getData().flip();
+    valueChunks.get(0).getData().flip();
+    valueChunks.get(1).getData().flip();
+    valueChunks.get(2).getData().flip();
+  }
+
+  @Test
+  public void NonAlignedChunkMultiPagesTest() throws IOException {
+    IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, 
TSEncoding.PLAIN);
+    ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema);
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, time);
+    }
+    chunkWriter.sealCurrentPage();
+    for (int time = 21; time <= 40; time++) {
+      chunkWriter.write(time, time);
+    }
+    chunkWriter.sealCurrentPage();
+    Chunk newChunk = getChunk(schema, chunkWriter);
+    ChunkReader chunkReader = new ChunkReader(newChunk);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    int i = 1;
+    for (IPageReader page : pageReaders) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals(i, point.getValue().getValue());
+        i++;
+      }
+    }
+    assertEquals(40, i - 1);
+    newChunk.getData().flip();
+    // rewrite INT32->DOUBLE
+    Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE);
+    ChunkReader chunkReader2 = new ChunkReader(newChunk2);
+    List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList();
+    i = 1;
+    for (IPageReader page : pageReaders2) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((double) i, point.getValue().getValue());
+        i++;
+      }
+    }
+  }
+
+  @Test
+  public void NonAlignedChunkSinglePageTest() throws IOException {
+    IMeasurementSchema schema = new MeasurementSchema("s1", TSDataType.INT32, 
TSEncoding.PLAIN);
+    ChunkWriterImpl chunkWriter = new ChunkWriterImpl(schema);
+    for (int time = 1; time <= 20; time++) {
+      chunkWriter.write(time, time);
+    }
+    chunkWriter.sealCurrentPage();
+
+    Chunk newChunk = getChunk(schema, chunkWriter);
+    ChunkReader chunkReader = new ChunkReader(newChunk);
+    List<IPageReader> pageReaders = chunkReader.loadPageReaderList();
+    for (IPageReader page : pageReaders) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals(i, point.getValue().getValue());
+        i++;
+      }
+      assertEquals(20, i - 1);
+    }
+    newChunk.getData().flip();
+    // rewrite FLOAT->DOUBLE
+    Chunk newChunk2 = newChunk.rewrite(TSDataType.DOUBLE);
+    ChunkReader chunkReader2 = new ChunkReader(newChunk2);
+    List<IPageReader> pageReaders2 = chunkReader2.loadPageReaderList();
+    for (IPageReader page : pageReaders2) {
+      BatchData data = page.getAllSatisfiedPageData(true);
+      IPointReader pointReader = data.getBatchDataIterator();
+      int i = 1;
+      while (pointReader.hasNextTimeValuePair()) {
+        TimeValuePair point = pointReader.nextTimeValuePair();
+        assertEquals((long) i, point.getTimestamp());
+        assertEquals((double) i, point.getValue().getValue());
+        i++;
+      }
+    }
+  }
+
+  public Chunk getTimeChunk(
+      VectorMeasurementSchema measurementSchema, TimeChunkWriter 
timeChunkWriter) {
+    ByteBuffer newChunkData = timeChunkWriter.getByteBuffer();
+    ChunkHeader newChunkHeader =
+        new ChunkHeader(
+            measurementSchema.getMeasurementName(),
+            newChunkData.capacity(),
+            TSDataType.VECTOR,
+            measurementSchema.getTimeCompressor(),
+            measurementSchema.getTimeTSEncoding(),
+            timeChunkWriter.getNumOfPages());
+    return new Chunk(newChunkHeader, newChunkData, null, 
timeChunkWriter.getStatistics());
+  }
+
+  public List<Chunk> getValueChunks(List<ValueChunkWriter> valueChunkWriters) {
+    List<Chunk> valueChunks = new ArrayList<>();
+    for (ValueChunkWriter valueChunkWriter : valueChunkWriters) {
+      ByteBuffer valueChunkData = valueChunkWriter.getByteBuffer();
+      ChunkHeader valueChunkHeader =
+          new ChunkHeader(
+              valueChunkWriter.getMeasurementId(),
+              valueChunkData.capacity(),
+              valueChunkWriter.getDataType(),
+              valueChunkWriter.getCompressionType(),
+              valueChunkWriter.getEncodingType(),
+              valueChunkWriter.getNumOfPages());
+      Chunk valueChunk =
+          new Chunk(valueChunkHeader, valueChunkData, null, 
valueChunkWriter.getStatistics());
+      valueChunks.add(valueChunk);
+    }
+    return valueChunks;
+  }
+
+  public Chunk getChunk(IMeasurementSchema schema, ChunkWriterImpl 
chunkWriter) {
+    ByteBuffer newChunkData = chunkWriter.getByteBuffer();
+    ChunkHeader newChunkHeader =
+        new ChunkHeader(
+            schema.getMeasurementName(),
+            newChunkData.capacity(),
+            schema.getType(),
+            schema.getCompressor(),
+            schema.getEncodingType(),
+            chunkWriter.getNumOfPages());
+    return new Chunk(newChunkHeader, newChunkData, null, 
chunkWriter.getStatistics());
+  }
+}
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
index dbda5319..7a385c0f 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
@@ -53,6 +53,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.junit.Assert.assertEquals;
+
 public class TsFileIOWriterTest {
 
   private static final String FILE_PATH =
@@ -103,9 +105,9 @@ public class TsFileIOWriterTest {
     TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH);
 
     // magic_string
-    Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
-    Assert.assertEquals(TSFileConfig.VERSION_NUMBER, 
reader.readVersionNumber());
-    Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
+    assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
+    assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber());
+    assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
 
     reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1);
 
@@ -113,39 +115,39 @@ public class TsFileIOWriterTest {
     ChunkGroupHeader chunkGroupHeader;
     for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
       // chunk group header
-      Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
+      assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
       chunkGroupHeader = reader.readChunkGroupHeader();
-      Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
+      assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
       // ordinary chunk header
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals(SENSOR_1, header.getMeasurementID());
+      assertEquals(SENSOR_1, header.getMeasurementID());
     }
 
     for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
       // chunk group header
-      Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
+      assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
       chunkGroupHeader = reader.readChunkGroupHeader();
-      Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
+      assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
       // vector chunk header (time)
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, 
reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals("", header.getMeasurementID());
+      assertEquals("", header.getMeasurementID());
       // vector chunk header (values)
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals("s1", header.getMeasurementID());
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals("s1", header.getMeasurementID());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals("s2", header.getMeasurementID());
+      assertEquals("s2", header.getMeasurementID());
     }
 
-    Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
+    assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
     reader.readPlanIndex();
-    Assert.assertEquals(100, reader.getMinPlanIndex());
-    Assert.assertEquals(10000, reader.getMaxPlanIndex());
+    assertEquals(100, reader.getMinPlanIndex());
+    assertEquals(10000, reader.getMaxPlanIndex());
 
-    Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
+    assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
 
     // make sure timeseriesMetadata is only
     Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
@@ -198,7 +200,7 @@ public class TsFileIOWriterTest {
       // vector chunk (time)
       writer.startFlushChunk(
           vectorMeasurementSchema.getMeasurementId(),
-          vectorMeasurementSchema.getCompressor(),
+          vectorMeasurementSchema.getTimeCompressor(),
           vectorMeasurementSchema.getType(),
           vectorMeasurementSchema.getTimeTSEncoding(),
           Statistics.getStatsByType(vectorMeasurementSchema.getType()),
@@ -214,7 +216,7 @@ public class TsFileIOWriterTest {
         subStatistics.updateStats(0L, 0L);
         writer.startFlushChunk(
             vectorMeasurementSchema.getSubMeasurementsList().get(j),
-            vectorMeasurementSchema.getCompressor(),
+            vectorMeasurementSchema.getValueCompressor(j),
             vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j),
             vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j),
             subStatistics,


Reply via email to