This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new de166a7d Support set default compression by data type (#523)
de166a7d is described below

commit de166a7d8bd1a5b3a6bdf4749362d583dfb1a9ef
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jun 24 19:05:56 2025 +0800

    Support set default compression by data type (#523)
---
 .../apache/tsfile/common/conf/TSFileConfig.java    |  80 ++++++++++++
 .../tsfile/common/conf/TSFileDescriptor.java       |   6 +
 .../tsfile/write/chunk/AlignedChunkWriterImpl.java |  11 +-
 .../tsfile/write/schema/MeasurementSchema.java     |   4 +-
 .../tsfile/write/schema/TimeseriesSchema.java      |   4 +-
 .../write/schema/VectorMeasurementSchema.java      | 107 ++++++++++++++--
 .../apache/tsfile/write/writer/TsFileIOWriter.java |   4 +
 .../org/apache/tsfile/write/ChunkRewriteTest.java  |   2 +-
 .../apache/tsfile/write/TsFileIOWriterTest.java    | 137 +++++++++++++++++----
 9 files changed, 312 insertions(+), 43 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java 
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
index a987bf34..709af3c0 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
@@ -126,6 +126,24 @@ public class TSFileConfig implements Serializable {
   /** Encoder of string, blob and text column. Default value is PLAIN. */
   private String textEncoding = "PLAIN";
 
+  /** Compression of boolean column. Defaults to the overall compression. */
+  private String booleanCompression = null;
+
+  /** Compression of int32 and date column. Defaults to the overall 
compression. */
+  private String int32Compression = null;
+
+  /** Compression of int64 and timestamp column. Defaults to the overall 
compression. */
+  private String int64Compression = null;
+
+  /** Compression of float column. Defaults to the overall compression. */
+  private String floatCompression = null;
+
+  /** Compression of double column. Defaults to the overall compression. */
+  private String doubleCompression = null;
+
+  /** Compression of string, blob and text column. Defaults to the overall 
compression. */
+  private String textCompression = null;
+
   /**
    * Encoder of value series. default value is PLAIN. For int, long data type, 
TsFile also supports
    * TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float, 
double data type, TsFile
@@ -361,6 +379,44 @@ public class TSFileConfig implements Serializable {
     }
   }
 
+  public CompressionType getCompressor(TSDataType dataType) {
+    String compressionName;
+    switch (dataType) {
+      case BOOLEAN:
+        compressionName = booleanCompression;
+        break;
+      case INT32:
+      case DATE:
+        compressionName = int32Compression;
+        break;
+      case INT64:
+      case TIMESTAMP:
+        compressionName = int64Compression;
+        break;
+      case FLOAT:
+        compressionName = floatCompression;
+        break;
+      case DOUBLE:
+        compressionName = doubleCompression;
+        break;
+      case STRING:
+      case BLOB:
+      case TEXT:
+        compressionName = textCompression;
+        break;
+      default:
+        compressionName = null;
+    }
+
+    CompressionType compressionType;
+    if (compressionName != null) {
+      compressionType = CompressionType.valueOf(compressionName);
+    } else {
+      compressionType = compressor;
+    }
+    return compressionType;
+  }
+
   public void setValueEncoder(String valueEncoder) {
     this.valueEncoder = valueEncoder;
   }
@@ -689,4 +745,28 @@ public class TSFileConfig implements Serializable {
   public void setLz4UseJni(boolean lz4UseJni) {
     this.lz4UseJni = lz4UseJni;
   }
+
+  public void setBooleanCompression(String booleanCompression) {
+    this.booleanCompression = booleanCompression;
+  }
+
+  public void setInt32Compression(String int32Compression) {
+    this.int32Compression = int32Compression;
+  }
+
+  public void setInt64Compression(String int64Compression) {
+    this.int64Compression = int64Compression;
+  }
+
+  public void setFloatCompression(String floatCompression) {
+    this.floatCompression = floatCompression;
+  }
+
+  public void setDoubleCompression(String doubleCompression) {
+    this.doubleCompression = doubleCompression;
+  }
+
+  public void setTextCompression(String textCompression) {
+    this.textCompression = textCompression;
+  }
 }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java 
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
index 435561d9..01b78316 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
@@ -81,6 +81,12 @@ public class TSFileDescriptor {
     writer.setInt(conf::setFloatPrecision, "float_precision");
     writer.setString(conf::setValueEncoder, "value_encoder");
     writer.setString(conf::setCompressor, "compressor");
+    writer.setString(conf::setBooleanCompression, "boolean_compressor");
+    writer.setString(conf::setInt32Compression, "int32_compressor");
+    writer.setString(conf::setInt64Compression, "int64_compressor");
+    writer.setString(conf::setFloatCompression, "float_compressor");
+    writer.setString(conf::setDoubleCompression, "double_compressor");
+    writer.setString(conf::setTextCompression, "text_compressor");
     writer.setInt(conf::setBatchSize, "batch_size");
     writer.setString(conf::setEncryptType, "encrypt_type");
     writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni");
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 49ec4d7f..2ec4bd8f 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -61,7 +61,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
     timeChunkWriter =
         new TimeChunkWriter(
             schema.getMeasurementName(),
-            schema.getCompressor(),
+            schema.getTimeCompressor(),
             schema.getTimeTSEncoding(),
             schema.getTimeEncoder(),
             this.encryptParam);
@@ -76,7 +76,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
       valueChunkWriterList.add(
           new ValueChunkWriter(
               valueMeasurementIdList.get(i),
-              schema.getCompressor(),
+              schema.getValueCompressor(i),
               valueTSDataTypeList.get(i),
               valueTSEncodingList.get(i),
               valueEncoderList.get(i),
@@ -92,7 +92,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
     timeChunkWriter =
         new TimeChunkWriter(
             schema.getMeasurementName(),
-            schema.getCompressor(),
+            schema.getTimeCompressor(),
             schema.getTimeTSEncoding(),
             schema.getTimeEncoder(),
             this.encryptParam);
@@ -107,7 +107,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter 
{
       valueChunkWriterList.add(
           new ValueChunkWriter(
               valueMeasurementIdList.get(i),
-              schema.getCompressor(),
+              schema.getValueCompressor(i),
               valueTSDataTypeList.get(i),
               valueTSEncodingList.get(i),
               valueEncoderList.get(i),
@@ -193,7 +193,8 @@ public class AlignedChunkWriterImpl implements IChunkWriter 
{
     TSEncoding timeEncoding =
         
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
     TSDataType timeType = 
TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
-    CompressionType timeCompression = 
TSFileDescriptor.getInstance().getConfig().getCompressor();
+    CompressionType timeCompression =
+        
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64);
     timeChunkWriter =
         new TimeChunkWriter(
             "",
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
index f63c2dc2..59ba1816 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
@@ -67,7 +67,7 @@ public class MeasurementSchema
         measurementName,
         dataType,
         
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(dataType)),
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(dataType),
         null);
   }
 
@@ -77,7 +77,7 @@ public class MeasurementSchema
         measurementName,
         dataType,
         encoding,
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(dataType),
         null);
   }
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
index e05912d0..21c74265 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
@@ -56,7 +56,7 @@ public class TimeseriesSchema implements 
Comparable<TimeseriesSchema>, Serializa
         fullPath,
         tsDataType,
         
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)),
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType),
         Collections.emptyMap());
   }
 
@@ -66,7 +66,7 @@ public class TimeseriesSchema implements 
Comparable<TimeseriesSchema>, Serializa
         fullPath,
         type,
         encoding,
-        TSFileDescriptor.getInstance().getConfig().getCompressor(),
+        TSFileDescriptor.getInstance().getConfig().getCompressor(type),
         Collections.emptyMap());
   }
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
index c53fee32..777eaf87 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
@@ -47,13 +47,19 @@ public class VectorMeasurementSchema
       RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class);
   private static final long BUILDER_SIZE =
       RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class);
+  private static final byte NO_UNIFIED_COMPRESSOR = -1;
 
   private String deviceId;
   private Map<String, Integer> measurementsToIndexMap;
   private byte[] types;
   private byte[] encodings;
   private TSEncodingBuilder[] encodingConverters;
-  private byte compressor;
+
+  /** For compatibility of old versions. */
+  private byte unifiedCompressor;
+
+  /** [0] is for the time column. */
+  private byte[] compressors;
 
   public VectorMeasurementSchema() {}
 
@@ -80,7 +86,34 @@ public class VectorMeasurementSchema
     }
     this.encodings = encodingsInByte;
     this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
-    this.compressor = compressionType.serialize();
+    this.unifiedCompressor = compressionType.serialize();
+  }
+
+  public VectorMeasurementSchema(
+      String deviceId,
+      String[] subMeasurements,
+      TSDataType[] types,
+      TSEncoding[] encodings,
+      byte[] compressors) {
+    this.deviceId = deviceId;
+    this.measurementsToIndexMap = new HashMap<>();
+    for (int i = 0; i < subMeasurements.length; i++) {
+      measurementsToIndexMap.put(subMeasurements[i], i);
+    }
+    byte[] typesInByte = new byte[types.length];
+    for (int i = 0; i < types.length; i++) {
+      typesInByte[i] = types[i].serialize();
+    }
+    this.types = typesInByte;
+
+    byte[] encodingsInByte = new byte[encodings.length];
+    for (int i = 0; i < encodings.length; i++) {
+      encodingsInByte[i] = encodings[i].serialize();
+    }
+    this.encodings = encodingsInByte;
+    this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
+    this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
+    this.compressors = compressors;
   }
 
   public VectorMeasurementSchema(String deviceId, String[] subMeasurements, 
TSDataType[] types) {
@@ -101,7 +134,15 @@ public class VectorMeasurementSchema
               .serialize();
     }
     this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
-    this.compressor = 
TSFileDescriptor.getInstance().getConfig().getCompressor().serialize();
+    this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
+    // the first column is time
+    this.compressors = new byte[subMeasurements.length + 1];
+    compressors[0] =
+        
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize();
+    for (int i = 0; i < types.length; i++) {
+      compressors[i + 1] =
+          
TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize();
+    }
   }
 
   public VectorMeasurementSchema(
@@ -124,9 +165,24 @@ public class VectorMeasurementSchema
     return deviceId;
   }
 
+  @Deprecated // Aligned series should not invoke this method
   @Override
   public CompressionType getCompressor() {
-    return CompressionType.deserialize(compressor);
+    throw new UnsupportedOperationException("Aligned series should not invoke 
this method");
+  }
+
+  public CompressionType getTimeCompressor() {
+    if (compressors != null) {
+      return CompressionType.deserialize(compressors[0]);
+    }
+    return CompressionType.deserialize(unifiedCompressor);
+  }
+
+  public CompressionType getValueCompressor(int index) {
+    if (compressors != null) {
+      return CompressionType.deserialize(compressors[index + 1]);
+    }
+    return CompressionType.deserialize(unifiedCompressor);
   }
 
   @Override
@@ -276,7 +332,11 @@ public class VectorMeasurementSchema
     for (byte encoding : encodings) {
       byteLen += ReadWriteIOUtils.write(encoding, buffer);
     }
-    byteLen += ReadWriteIOUtils.write(compressor, buffer);
+    byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer);
+    if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      buffer.put(compressors);
+      byteLen += compressors.length;
+    }
 
     return byteLen;
   }
@@ -297,7 +357,11 @@ public class VectorMeasurementSchema
     for (byte encoding : encodings) {
       byteLen += ReadWriteIOUtils.write(encoding, outputStream);
     }
-    byteLen += ReadWriteIOUtils.write(compressor, outputStream);
+    byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream);
+    if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      outputStream.write(compressors);
+      byteLen += compressors.length;
+    }
 
     return byteLen;
   }
@@ -348,7 +412,15 @@ public class VectorMeasurementSchema
     }
     vectorMeasurementSchema.encodings = encodings;
 
-    vectorMeasurementSchema.compressor = 
ReadWriteIOUtils.readByte(inputStream);
+    vectorMeasurementSchema.unifiedCompressor = 
ReadWriteIOUtils.readByte(inputStream);
+    if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      byte[] compressors = new byte[measurementSize + 1];
+      int read = inputStream.read(compressors);
+      if (read != measurementSize) {
+        throw new IOException("Unexpected end of stream when reading 
compressors");
+      }
+      vectorMeasurementSchema.compressors = compressors;
+    }
     return vectorMeasurementSchema;
   }
 
@@ -375,7 +447,12 @@ public class VectorMeasurementSchema
     }
     vectorMeasurementSchema.encodings = encodings;
 
-    vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer);
+    vectorMeasurementSchema.unifiedCompressor = 
ReadWriteIOUtils.readByte(buffer);
+    if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+      byte[] compressors = new byte[measurementSize + 1];
+      buffer.get(compressors);
+      vectorMeasurementSchema.compressors = compressors;
+    }
     return vectorMeasurementSchema;
   }
 
@@ -391,12 +468,13 @@ public class VectorMeasurementSchema
     return Arrays.equals(types, that.types)
         && Arrays.equals(encodings, that.encodings)
         && Objects.equals(deviceId, that.deviceId)
-        && Objects.equals(compressor, that.compressor);
+        && Objects.equals(unifiedCompressor, that.unifiedCompressor)
+        && Objects.equals(compressors, that.compressors);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hash(deviceId, types, encodings, compressor);
+    return Objects.hash(deviceId, types, encodings, unifiedCompressor, 
compressors);
   }
 
   /** compare by vector name */
@@ -424,7 +502,14 @@ public class VectorMeasurementSchema
           TSEncoding.deserialize(encodings[entry.getValue()]).toString());
       sc.addTail("],");
     }
-    sc.addTail(CompressionType.deserialize(compressor).toString());
+    if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) {
+      sc.addTail(CompressionType.deserialize(unifiedCompressor).toString());
+    } else {
+      for (byte compressor : compressors) {
+        
sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(",");
+      }
+    }
+
     return sc.toString();
   }
 
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 7d8dc490..31a6af52 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -403,6 +403,10 @@ public class TsFileIOWriter implements AutoCloseable {
    */
   @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
   public void endFile() throws IOException {
+    if (!canWrite) {
+      return;
+    }
+
     checkInMemoryPathCount();
     readChunkMetadataAndConstructIndexTree();
 
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
index 90215ccf..30e36055 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
@@ -385,7 +385,7 @@ public class ChunkRewriteTest {
             measurementSchema.getMeasurementName(),
             newChunkData.capacity(),
             TSDataType.VECTOR,
-            measurementSchema.getCompressor(),
+            measurementSchema.getTimeCompressor(),
             measurementSchema.getTimeTSEncoding(),
             timeChunkWriter.getNumOfPages());
     return new Chunk(newChunkHeader, newChunkData, null, 
timeChunkWriter.getStatistics());
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java 
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
index 1dd2eff9..51a8b80a 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
@@ -19,22 +19,32 @@
 package org.apache.tsfile.write;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
 import org.apache.tsfile.common.constant.TsFileConstant;
 import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.enums.ColumnCategory;
 import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.header.ChunkGroupHeader;
 import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.ColumnSchema;
 import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
 import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
 import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
 import org.apache.tsfile.file.metadata.enums.TSEncoding;
 import org.apache.tsfile.file.metadata.statistics.Statistics;
 import org.apache.tsfile.file.metadata.utils.TestHelper;
 import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
 import org.apache.tsfile.read.common.Path;
 import org.apache.tsfile.utils.MeasurementGroup;
+import org.apache.tsfile.write.record.Tablet;
 import org.apache.tsfile.write.schema.IMeasurementSchema;
 import org.apache.tsfile.write.schema.MeasurementSchema;
 import org.apache.tsfile.write.schema.Schema;
@@ -49,11 +59,14 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.junit.Assert.assertEquals;
+
 public class TsFileIOWriterTest {
 
   private static final String FILE_PATH =
@@ -99,14 +112,94 @@ public class TsFileIOWriterTest {
     }
   }
 
+  @Test
+  public void changeTypeCompressionTest() throws IOException, 
WriteProcessException {
+    TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+    CompressionType prevInt32Compression = 
config.getCompressor(TSDataType.INT32);
+    CompressionType prevTextCompression = 
config.getCompressor(TSDataType.TEXT);
+    config.setInt32Compression("UNCOMPRESSED");
+    config.setTextCompression("GZIP");
+
+    try (TsFileIOWriter ioWriter =
+            new TsFileIOWriter(
+                new File(
+                    
TestConstant.BASE_OUTPUT_PATH.concat("changeTypeCompressionTest.tsfile")));
+        TsFileWriter fileWriter = new TsFileWriter(ioWriter)) {
+      fileWriter.registerTimeseries(
+          Factory.DEFAULT_FACTORY.create("root.db1.d1"),
+          new MeasurementSchema("s1", TSDataType.INT32));
+      fileWriter.registerTimeseries(
+          Factory.DEFAULT_FACTORY.create("root.db1.d1"),
+          new MeasurementSchema("s2", TSDataType.TEXT));
+      TableSchema tableSchema =
+          new TableSchema(
+              "t1",
+              Arrays.asList(
+                  new ColumnSchema("s1", TSDataType.INT32, 
ColumnCategory.FIELD),
+                  new ColumnSchema("s2", TSDataType.TEXT, 
ColumnCategory.FIELD)));
+      fileWriter.registerTableSchema(tableSchema);
+
+      Tablet treeTablet =
+          new Tablet(
+              "root.db1.d1",
+              Arrays.asList(
+                  new MeasurementSchema("s1", TSDataType.INT32),
+                  new MeasurementSchema("s2", TSDataType.TEXT)));
+      treeTablet.addTimestamp(0, 0);
+      treeTablet.addValue(0, 0, 0);
+      treeTablet.addValue(0, 1, "0");
+      fileWriter.writeTree(treeTablet);
+
+      Tablet tableTablet =
+          new Tablet(
+              "t1",
+              Arrays.asList("s1", "s2"),
+              Arrays.asList(TSDataType.INT32, TSDataType.TEXT),
+              Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD));
+      tableTablet.addTimestamp(0, 0);
+      tableTablet.addValue(0, 0, 0);
+      tableTablet.addValue(0, 1, "0");
+      fileWriter.writeTable(tableTablet);
+      fileWriter.flush();
+
+      ChunkMetadata s1TreeChunkMeta =
+          
ioWriter.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(0);
+      ChunkMetadata s2TreeChunkMeta =
+          
ioWriter.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(1);
+      ChunkMetadata s1TableChunkMeta =
+          
ioWriter.getChunkGroupMetadataList().get(1).getChunkMetadataList().get(1);
+      ChunkMetadata s2TableChunkMeta =
+          
ioWriter.getChunkGroupMetadataList().get(1).getChunkMetadataList().get(2);
+
+      fileWriter.close();
+
+      try (TsFileSequenceReader sequenceReader =
+          new TsFileSequenceReader(
+              
TestConstant.BASE_OUTPUT_PATH.concat("changeTypeCompressionTest.tsfile"))) {
+        Chunk chunk = sequenceReader.readMemChunk(s1TreeChunkMeta);
+        assertEquals(CompressionType.UNCOMPRESSED, 
chunk.getHeader().getCompressionType());
+        chunk = sequenceReader.readMemChunk(s2TreeChunkMeta);
+        assertEquals(CompressionType.GZIP, 
chunk.getHeader().getCompressionType());
+        chunk = sequenceReader.readMemChunk(s1TableChunkMeta);
+        assertEquals(CompressionType.UNCOMPRESSED, 
chunk.getHeader().getCompressionType());
+        chunk = sequenceReader.readMemChunk(s2TableChunkMeta);
+        assertEquals(CompressionType.GZIP, 
chunk.getHeader().getCompressionType());
+      }
+
+    } finally {
+      config.setInt32Compression(prevInt32Compression.name());
+      config.setTextCompression(prevTextCompression.name());
+    }
+  }
+
   @Test
   public void endFileTest() throws IOException {
     TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH);
 
     // magic_string
-    Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
-    Assert.assertEquals(TSFileConfig.VERSION_NUMBER, 
reader.readVersionNumber());
-    Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
+    assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
+    assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber());
+    assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
 
     reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1);
 
@@ -114,39 +207,39 @@ public class TsFileIOWriterTest {
     ChunkGroupHeader chunkGroupHeader;
     for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
       // chunk group header
-      Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
+      assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
       chunkGroupHeader = reader.readChunkGroupHeader();
-      Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
+      assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
       // ordinary chunk header
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals(SENSOR_1, header.getMeasurementID());
+      assertEquals(SENSOR_1, header.getMeasurementID());
     }
 
     for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
       // chunk group header
-      Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
+      assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
       chunkGroupHeader = reader.readChunkGroupHeader();
-      Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
+      assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
       // vector chunk header (time)
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER, 
reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals("", header.getMeasurementID());
+      assertEquals("", header.getMeasurementID());
       // vector chunk header (values)
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals("s1", header.getMeasurementID());
-      Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
+      assertEquals("s1", header.getMeasurementID());
+      assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER, 
reader.readMarker());
       header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
-      Assert.assertEquals("s2", header.getMeasurementID());
+      assertEquals("s2", header.getMeasurementID());
     }
 
-    Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
+    assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
     reader.readPlanIndex();
-    Assert.assertEquals(100, reader.getMinPlanIndex());
-    Assert.assertEquals(10000, reader.getMaxPlanIndex());
+    assertEquals(100, reader.getMinPlanIndex());
+    assertEquals(10000, reader.getMaxPlanIndex());
 
-    Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
+    assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
 
     // make sure timeseriesMetadata is only
     Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
@@ -167,7 +260,7 @@ public class TsFileIOWriterTest {
     for (MetadataIndexNode node : 
metaData.getTableMetadataIndexNodeMap().values()) {
       cnt += node.getChildren().size();
     }
-    Assert.assertEquals(2, cnt);
+    assertEquals(2, cnt);
   }
 
   private void writeChunkGroup(TsFileIOWriter writer, IMeasurementSchema 
measurementSchema)
@@ -200,7 +293,7 @@ public class TsFileIOWriterTest {
       // vector chunk (time)
       writer.startFlushChunk(
           vectorMeasurementSchema.getMeasurementName(),
-          vectorMeasurementSchema.getCompressor(),
+          vectorMeasurementSchema.getTimeCompressor(),
           vectorMeasurementSchema.getType(),
           vectorMeasurementSchema.getTimeTSEncoding(),
           Statistics.getStatsByType(vectorMeasurementSchema.getType()),
@@ -216,7 +309,7 @@ public class TsFileIOWriterTest {
         subStatistics.updateStats(0L, 0L);
         writer.startFlushChunk(
             vectorMeasurementSchema.getSubMeasurementsList().get(j),
-            vectorMeasurementSchema.getCompressor(),
+            vectorMeasurementSchema.getValueCompressor(j),
             vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j),
             vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j),
             subStatistics,

Reply via email to