This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new de166a7d Support set default compression by data type (#523)
de166a7d is described below
commit de166a7d8bd1a5b3a6bdf4749362d583dfb1a9ef
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Jun 24 19:05:56 2025 +0800
Support set default compression by data type (#523)
---
.../apache/tsfile/common/conf/TSFileConfig.java | 80 ++++++++++++
.../tsfile/common/conf/TSFileDescriptor.java | 6 +
.../tsfile/write/chunk/AlignedChunkWriterImpl.java | 11 +-
.../tsfile/write/schema/MeasurementSchema.java | 4 +-
.../tsfile/write/schema/TimeseriesSchema.java | 4 +-
.../write/schema/VectorMeasurementSchema.java | 107 ++++++++++++++--
.../apache/tsfile/write/writer/TsFileIOWriter.java | 4 +
.../org/apache/tsfile/write/ChunkRewriteTest.java | 2 +-
.../apache/tsfile/write/TsFileIOWriterTest.java | 137 +++++++++++++++++----
9 files changed, 312 insertions(+), 43 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
index a987bf34..709af3c0 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileConfig.java
@@ -126,6 +126,24 @@ public class TSFileConfig implements Serializable {
/** Encoder of string, blob and text column. Default value is PLAIN. */
private String textEncoding = "PLAIN";
+ /** Compression of boolean column. Defaults to the overall compression. */
+ private String booleanCompression = null;
+
+ /** Compression of int32 and date column. Defaults to the overall
compression. */
+ private String int32Compression = null;
+
+ /** Compression of int64 and timestamp column. Defaults to the overall
compression. */
+ private String int64Compression = null;
+
+ /** Compression of float column. Defaults to the overall compression. */
+ private String floatCompression = null;
+
+ /** Compression of double column. Defaults to the overall compression. */
+ private String doubleCompression = null;
+
+ /** Compression of string, blob and text column. Defaults to the overall
compression. */
+ private String textCompression = null;
+
/**
* Encoder of value series. default value is PLAIN. For int, long data type,
TsFile also supports
* TS_2DIFF, REGULAR, GORILLA and RLE(run-length encoding). For float,
double data type, TsFile
@@ -361,6 +379,44 @@ public class TSFileConfig implements Serializable {
}
}
+ public CompressionType getCompressor(TSDataType dataType) {
+ String compressionName;
+ switch (dataType) {
+ case BOOLEAN:
+ compressionName = booleanCompression;
+ break;
+ case INT32:
+ case DATE:
+ compressionName = int32Compression;
+ break;
+ case INT64:
+ case TIMESTAMP:
+ compressionName = int64Compression;
+ break;
+ case FLOAT:
+ compressionName = floatCompression;
+ break;
+ case DOUBLE:
+ compressionName = doubleCompression;
+ break;
+ case STRING:
+ case BLOB:
+ case TEXT:
+ compressionName = textCompression;
+ break;
+ default:
+ compressionName = null;
+ }
+
+ CompressionType compressionType;
+ if (compressionName != null) {
+ compressionType = CompressionType.valueOf(compressionName);
+ } else {
+ compressionType = compressor;
+ }
+ return compressionType;
+ }
+
public void setValueEncoder(String valueEncoder) {
this.valueEncoder = valueEncoder;
}
@@ -689,4 +745,28 @@ public class TSFileConfig implements Serializable {
public void setLz4UseJni(boolean lz4UseJni) {
this.lz4UseJni = lz4UseJni;
}
+
+ public void setBooleanCompression(String booleanCompression) {
+ this.booleanCompression = booleanCompression;
+ }
+
+ public void setInt32Compression(String int32Compression) {
+ this.int32Compression = int32Compression;
+ }
+
+ public void setInt64Compression(String int64Compression) {
+ this.int64Compression = int64Compression;
+ }
+
+ public void setFloatCompression(String floatCompression) {
+ this.floatCompression = floatCompression;
+ }
+
+ public void setDoubleCompression(String doubleCompression) {
+ this.doubleCompression = doubleCompression;
+ }
+
+ public void setTextCompression(String textCompression) {
+ this.textCompression = textCompression;
+ }
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
index 435561d9..01b78316 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/common/conf/TSFileDescriptor.java
@@ -81,6 +81,12 @@ public class TSFileDescriptor {
writer.setInt(conf::setFloatPrecision, "float_precision");
writer.setString(conf::setValueEncoder, "value_encoder");
writer.setString(conf::setCompressor, "compressor");
+ writer.setString(conf::setBooleanCompression, "boolean_compressor");
+ writer.setString(conf::setInt32Compression, "int32_compressor");
+ writer.setString(conf::setInt64Compression, "int64_compressor");
+ writer.setString(conf::setFloatCompression, "float_compressor");
+ writer.setString(conf::setDoubleCompression, "double_compressor");
+ writer.setString(conf::setTextCompression, "text_compressor");
writer.setInt(conf::setBatchSize, "batch_size");
writer.setString(conf::setEncryptType, "encrypt_type");
writer.setBoolean(conf::setLz4UseJni, "lz4_use_jni");
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
index 49ec4d7f..2ec4bd8f 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/chunk/AlignedChunkWriterImpl.java
@@ -61,7 +61,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
timeChunkWriter =
new TimeChunkWriter(
schema.getMeasurementName(),
- schema.getCompressor(),
+ schema.getTimeCompressor(),
schema.getTimeTSEncoding(),
schema.getTimeEncoder(),
this.encryptParam);
@@ -76,7 +76,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
valueChunkWriterList.add(
new ValueChunkWriter(
valueMeasurementIdList.get(i),
- schema.getCompressor(),
+ schema.getValueCompressor(i),
valueTSDataTypeList.get(i),
valueTSEncodingList.get(i),
valueEncoderList.get(i),
@@ -92,7 +92,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter {
timeChunkWriter =
new TimeChunkWriter(
schema.getMeasurementName(),
- schema.getCompressor(),
+ schema.getTimeCompressor(),
schema.getTimeTSEncoding(),
schema.getTimeEncoder(),
this.encryptParam);
@@ -107,7 +107,7 @@ public class AlignedChunkWriterImpl implements IChunkWriter
{
valueChunkWriterList.add(
new ValueChunkWriter(
valueMeasurementIdList.get(i),
- schema.getCompressor(),
+ schema.getValueCompressor(i),
valueTSDataTypeList.get(i),
valueTSEncodingList.get(i),
valueEncoderList.get(i),
@@ -193,7 +193,8 @@ public class AlignedChunkWriterImpl implements IChunkWriter
{
TSEncoding timeEncoding =
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getTimeEncoder());
TSDataType timeType =
TSFileDescriptor.getInstance().getConfig().getTimeSeriesDataType();
- CompressionType timeCompression =
TSFileDescriptor.getInstance().getConfig().getCompressor();
+ CompressionType timeCompression =
+
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64);
timeChunkWriter =
new TimeChunkWriter(
"",
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
index f63c2dc2..59ba1816 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/MeasurementSchema.java
@@ -67,7 +67,7 @@ public class MeasurementSchema
measurementName,
dataType,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(dataType)),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(dataType),
null);
}
@@ -77,7 +77,7 @@ public class MeasurementSchema
measurementName,
dataType,
encoding,
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(dataType),
null);
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
index e05912d0..21c74265 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/TimeseriesSchema.java
@@ -56,7 +56,7 @@ public class TimeseriesSchema implements
Comparable<TimeseriesSchema>, Serializa
fullPath,
tsDataType,
TSEncoding.valueOf(TSFileDescriptor.getInstance().getConfig().getValueEncoder(tsDataType)),
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(tsDataType),
Collections.emptyMap());
}
@@ -66,7 +66,7 @@ public class TimeseriesSchema implements
Comparable<TimeseriesSchema>, Serializa
fullPath,
type,
encoding,
- TSFileDescriptor.getInstance().getConfig().getCompressor(),
+ TSFileDescriptor.getInstance().getConfig().getCompressor(type),
Collections.emptyMap());
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
index c53fee32..777eaf87 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/schema/VectorMeasurementSchema.java
@@ -47,13 +47,19 @@ public class VectorMeasurementSchema
RamUsageEstimator.shallowSizeOfInstance(VectorMeasurementSchema.class);
private static final long BUILDER_SIZE =
RamUsageEstimator.shallowSizeOfInstance(TSEncodingBuilder.class);
+ private static final byte NO_UNIFIED_COMPRESSOR = -1;
private String deviceId;
private Map<String, Integer> measurementsToIndexMap;
private byte[] types;
private byte[] encodings;
private TSEncodingBuilder[] encodingConverters;
- private byte compressor;
+
+ /** For compatibility of old versions. */
+ private byte unifiedCompressor;
+
+ /** [0] is for the time column. */
+ private byte[] compressors;
public VectorMeasurementSchema() {}
@@ -80,7 +86,34 @@ public class VectorMeasurementSchema
}
this.encodings = encodingsInByte;
this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
- this.compressor = compressionType.serialize();
+ this.unifiedCompressor = compressionType.serialize();
+ }
+
+ public VectorMeasurementSchema(
+ String deviceId,
+ String[] subMeasurements,
+ TSDataType[] types,
+ TSEncoding[] encodings,
+ byte[] compressors) {
+ this.deviceId = deviceId;
+ this.measurementsToIndexMap = new HashMap<>();
+ for (int i = 0; i < subMeasurements.length; i++) {
+ measurementsToIndexMap.put(subMeasurements[i], i);
+ }
+ byte[] typesInByte = new byte[types.length];
+ for (int i = 0; i < types.length; i++) {
+ typesInByte[i] = types[i].serialize();
+ }
+ this.types = typesInByte;
+
+ byte[] encodingsInByte = new byte[encodings.length];
+ for (int i = 0; i < encodings.length; i++) {
+ encodingsInByte[i] = encodings[i].serialize();
+ }
+ this.encodings = encodingsInByte;
+ this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
+ this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
+ this.compressors = compressors;
}
public VectorMeasurementSchema(String deviceId, String[] subMeasurements,
TSDataType[] types) {
@@ -101,7 +134,15 @@ public class VectorMeasurementSchema
.serialize();
}
this.encodingConverters = new TSEncodingBuilder[subMeasurements.length];
- this.compressor =
TSFileDescriptor.getInstance().getConfig().getCompressor().serialize();
+ this.unifiedCompressor = NO_UNIFIED_COMPRESSOR;
+ // the first column is time
+ this.compressors = new byte[subMeasurements.length + 1];
+ compressors[0] =
+
TSFileDescriptor.getInstance().getConfig().getCompressor(TSDataType.INT64).serialize();
+ for (int i = 0; i < types.length; i++) {
+ compressors[i + 1] =
+
TSFileDescriptor.getInstance().getConfig().getCompressor(types[i]).serialize();
+ }
}
public VectorMeasurementSchema(
@@ -124,9 +165,24 @@ public class VectorMeasurementSchema
return deviceId;
}
+ @Deprecated // Aligned series should not invoke this method
@Override
public CompressionType getCompressor() {
- return CompressionType.deserialize(compressor);
+ throw new UnsupportedOperationException("Aligned series should not invoke
this method");
+ }
+
+ public CompressionType getTimeCompressor() {
+ if (compressors != null) {
+ return CompressionType.deserialize(compressors[0]);
+ }
+ return CompressionType.deserialize(unifiedCompressor);
+ }
+
+ public CompressionType getValueCompressor(int index) {
+ if (compressors != null) {
+ return CompressionType.deserialize(compressors[index + 1]);
+ }
+ return CompressionType.deserialize(unifiedCompressor);
}
@Override
@@ -276,7 +332,11 @@ public class VectorMeasurementSchema
for (byte encoding : encodings) {
byteLen += ReadWriteIOUtils.write(encoding, buffer);
}
- byteLen += ReadWriteIOUtils.write(compressor, buffer);
+ byteLen += ReadWriteIOUtils.write(unifiedCompressor, buffer);
+ if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+ buffer.put(compressors);
+ byteLen += compressors.length;
+ }
return byteLen;
}
@@ -297,7 +357,11 @@ public class VectorMeasurementSchema
for (byte encoding : encodings) {
byteLen += ReadWriteIOUtils.write(encoding, outputStream);
}
- byteLen += ReadWriteIOUtils.write(compressor, outputStream);
+ byteLen += ReadWriteIOUtils.write(unifiedCompressor, outputStream);
+ if (unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+ outputStream.write(compressors);
+ byteLen += compressors.length;
+ }
return byteLen;
}
@@ -348,7 +412,15 @@ public class VectorMeasurementSchema
}
vectorMeasurementSchema.encodings = encodings;
- vectorMeasurementSchema.compressor =
ReadWriteIOUtils.readByte(inputStream);
+ vectorMeasurementSchema.unifiedCompressor =
ReadWriteIOUtils.readByte(inputStream);
+ if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+ byte[] compressors = new byte[measurementSize + 1];
+ int read = inputStream.read(compressors);
+ if (read != measurementSize) {
+ throw new IOException("Unexpected end of stream when reading
compressors");
+ }
+ vectorMeasurementSchema.compressors = compressors;
+ }
return vectorMeasurementSchema;
}
@@ -375,7 +447,12 @@ public class VectorMeasurementSchema
}
vectorMeasurementSchema.encodings = encodings;
- vectorMeasurementSchema.compressor = ReadWriteIOUtils.readByte(buffer);
+ vectorMeasurementSchema.unifiedCompressor =
ReadWriteIOUtils.readByte(buffer);
+ if (vectorMeasurementSchema.unifiedCompressor == NO_UNIFIED_COMPRESSOR) {
+ byte[] compressors = new byte[measurementSize + 1];
+ buffer.get(compressors);
+ vectorMeasurementSchema.compressors = compressors;
+ }
return vectorMeasurementSchema;
}
@@ -391,12 +468,13 @@ public class VectorMeasurementSchema
return Arrays.equals(types, that.types)
&& Arrays.equals(encodings, that.encodings)
&& Objects.equals(deviceId, that.deviceId)
- && Objects.equals(compressor, that.compressor);
+ && Objects.equals(unifiedCompressor, that.unifiedCompressor)
+ && Objects.equals(compressors, that.compressors);
}
@Override
public int hashCode() {
- return Objects.hash(deviceId, types, encodings, compressor);
+ return Objects.hash(deviceId, types, encodings, unifiedCompressor,
compressors);
}
/** compare by vector name */
@@ -424,7 +502,14 @@ public class VectorMeasurementSchema
TSEncoding.deserialize(encodings[entry.getValue()]).toString());
sc.addTail("],");
}
- sc.addTail(CompressionType.deserialize(compressor).toString());
+ if (unifiedCompressor != NO_UNIFIED_COMPRESSOR) {
+ sc.addTail(CompressionType.deserialize(unifiedCompressor).toString());
+ } else {
+ for (byte compressor : compressors) {
+
sc.addTail(CompressionType.deserialize(compressor).toString()).addTail(",");
+ }
+ }
+
return sc.toString();
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
index 7d8dc490..31a6af52 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/write/writer/TsFileIOWriter.java
@@ -403,6 +403,10 @@ public class TsFileIOWriter implements AutoCloseable {
*/
@SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity
warning
public void endFile() throws IOException {
+ if (!canWrite) {
+ return;
+ }
+
checkInMemoryPathCount();
readChunkMetadataAndConstructIndexTree();
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
index 90215ccf..30e36055 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/ChunkRewriteTest.java
@@ -385,7 +385,7 @@ public class ChunkRewriteTest {
measurementSchema.getMeasurementName(),
newChunkData.capacity(),
TSDataType.VECTOR,
- measurementSchema.getCompressor(),
+ measurementSchema.getTimeCompressor(),
measurementSchema.getTimeTSEncoding(),
timeChunkWriter.getNumOfPages());
return new Chunk(newChunkHeader, newChunkData, null,
timeChunkWriter.getStatistics());
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
index 1dd2eff9..51a8b80a 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileIOWriterTest.java
@@ -19,22 +19,32 @@
package org.apache.tsfile.write;
import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.common.conf.TSFileDescriptor;
import org.apache.tsfile.common.constant.TsFileConstant;
import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.enums.ColumnCategory;
import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.header.ChunkGroupHeader;
import org.apache.tsfile.file.header.ChunkHeader;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.ColumnSchema;
import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.IDeviceID.Factory;
import org.apache.tsfile.file.metadata.MetadataIndexNode;
+import org.apache.tsfile.file.metadata.TableSchema;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
import org.apache.tsfile.file.metadata.TsFileMetadata;
+import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.file.metadata.statistics.Statistics;
import org.apache.tsfile.file.metadata.utils.TestHelper;
import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Chunk;
import org.apache.tsfile.read.common.Path;
import org.apache.tsfile.utils.MeasurementGroup;
+import org.apache.tsfile.write.record.Tablet;
import org.apache.tsfile.write.schema.IMeasurementSchema;
import org.apache.tsfile.write.schema.MeasurementSchema;
import org.apache.tsfile.write.schema.Schema;
@@ -49,11 +59,14 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import static org.junit.Assert.assertEquals;
+
public class TsFileIOWriterTest {
private static final String FILE_PATH =
@@ -99,14 +112,94 @@ public class TsFileIOWriterTest {
}
}
+ @Test
+ public void changeTypeCompressionTest() throws IOException,
WriteProcessException {
+ TSFileConfig config = TSFileDescriptor.getInstance().getConfig();
+ CompressionType prevInt32Compression =
config.getCompressor(TSDataType.INT32);
+ CompressionType prevTextCompression =
config.getCompressor(TSDataType.TEXT);
+ config.setInt32Compression("UNCOMPRESSED");
+ config.setTextCompression("GZIP");
+
+ try (TsFileIOWriter ioWriter =
+ new TsFileIOWriter(
+ new File(
+
TestConstant.BASE_OUTPUT_PATH.concat("changeTypeCompressionTest.tsfile")));
+ TsFileWriter fileWriter = new TsFileWriter(ioWriter)) {
+ fileWriter.registerTimeseries(
+ Factory.DEFAULT_FACTORY.create("root.db1.d1"),
+ new MeasurementSchema("s1", TSDataType.INT32));
+ fileWriter.registerTimeseries(
+ Factory.DEFAULT_FACTORY.create("root.db1.d1"),
+ new MeasurementSchema("s2", TSDataType.TEXT));
+ TableSchema tableSchema =
+ new TableSchema(
+ "t1",
+ Arrays.asList(
+ new ColumnSchema("s1", TSDataType.INT32,
ColumnCategory.FIELD),
+ new ColumnSchema("s2", TSDataType.TEXT,
ColumnCategory.FIELD)));
+ fileWriter.registerTableSchema(tableSchema);
+
+ Tablet treeTablet =
+ new Tablet(
+ "root.db1.d1",
+ Arrays.asList(
+ new MeasurementSchema("s1", TSDataType.INT32),
+ new MeasurementSchema("s2", TSDataType.TEXT)));
+ treeTablet.addTimestamp(0, 0);
+ treeTablet.addValue(0, 0, 0);
+ treeTablet.addValue(0, 1, "0");
+ fileWriter.writeTree(treeTablet);
+
+ Tablet tableTablet =
+ new Tablet(
+ "t1",
+ Arrays.asList("s1", "s2"),
+ Arrays.asList(TSDataType.INT32, TSDataType.TEXT),
+ Arrays.asList(ColumnCategory.FIELD, ColumnCategory.FIELD));
+ tableTablet.addTimestamp(0, 0);
+ tableTablet.addValue(0, 0, 0);
+ tableTablet.addValue(0, 1, "0");
+ fileWriter.writeTable(tableTablet);
+ fileWriter.flush();
+
+ ChunkMetadata s1TreeChunkMeta =
+
ioWriter.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(0);
+ ChunkMetadata s2TreeChunkMeta =
+
ioWriter.getChunkGroupMetadataList().get(0).getChunkMetadataList().get(1);
+ ChunkMetadata s1TableChunkMeta =
+
ioWriter.getChunkGroupMetadataList().get(1).getChunkMetadataList().get(1);
+ ChunkMetadata s2TableChunkMeta =
+
ioWriter.getChunkGroupMetadataList().get(1).getChunkMetadataList().get(2);
+
+ fileWriter.close();
+
+ try (TsFileSequenceReader sequenceReader =
+ new TsFileSequenceReader(
+
TestConstant.BASE_OUTPUT_PATH.concat("changeTypeCompressionTest.tsfile"))) {
+ Chunk chunk = sequenceReader.readMemChunk(s1TreeChunkMeta);
+ assertEquals(CompressionType.UNCOMPRESSED,
chunk.getHeader().getCompressionType());
+ chunk = sequenceReader.readMemChunk(s2TreeChunkMeta);
+ assertEquals(CompressionType.GZIP,
chunk.getHeader().getCompressionType());
+ chunk = sequenceReader.readMemChunk(s1TableChunkMeta);
+ assertEquals(CompressionType.UNCOMPRESSED,
chunk.getHeader().getCompressionType());
+ chunk = sequenceReader.readMemChunk(s2TableChunkMeta);
+ assertEquals(CompressionType.GZIP,
chunk.getHeader().getCompressionType());
+ }
+
+ } finally {
+ config.setInt32Compression(prevInt32Compression.name());
+ config.setTextCompression(prevTextCompression.name());
+ }
+ }
+
@Test
public void endFileTest() throws IOException {
TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH);
// magic_string
- Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
- Assert.assertEquals(TSFileConfig.VERSION_NUMBER,
reader.readVersionNumber());
- Assert.assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
+ assertEquals(TSFileConfig.MAGIC_STRING, reader.readHeadMagic());
+ assertEquals(TSFileConfig.VERSION_NUMBER, reader.readVersionNumber());
+ assertEquals(TSFileConfig.MAGIC_STRING, reader.readTailMagic());
reader.position(TSFileConfig.MAGIC_STRING.getBytes().length + 1);
@@ -114,39 +207,39 @@ public class TsFileIOWriterTest {
ChunkGroupHeader chunkGroupHeader;
for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
// chunk group header
- Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
+ assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
chunkGroupHeader = reader.readChunkGroupHeader();
- Assert.assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
+ assertEquals(DEVICE_1, chunkGroupHeader.getDeviceID());
// ordinary chunk header
- Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER,
reader.readMarker());
+ assertEquals(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER, reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
- Assert.assertEquals(SENSOR_1, header.getMeasurementID());
+ assertEquals(SENSOR_1, header.getMeasurementID());
}
for (int i = 0; i < CHUNK_GROUP_NUM; i++) {
// chunk group header
- Assert.assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
+ assertEquals(MetaMarker.CHUNK_GROUP_HEADER, reader.readMarker());
chunkGroupHeader = reader.readChunkGroupHeader();
- Assert.assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
+ assertEquals(DEVICE_2, chunkGroupHeader.getDeviceID());
// vector chunk header (time)
- Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER,
reader.readMarker());
+ assertEquals(MetaMarker.ONLY_ONE_PAGE_TIME_CHUNK_HEADER,
reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
- Assert.assertEquals("", header.getMeasurementID());
+ assertEquals("", header.getMeasurementID());
// vector chunk header (values)
- Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER,
reader.readMarker());
+ assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER,
reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
- Assert.assertEquals("s1", header.getMeasurementID());
- Assert.assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER,
reader.readMarker());
+ assertEquals("s1", header.getMeasurementID());
+ assertEquals(MetaMarker.ONLY_ONE_PAGE_VALUE_CHUNK_HEADER,
reader.readMarker());
header = reader.readChunkHeader(MetaMarker.ONLY_ONE_PAGE_CHUNK_HEADER);
- Assert.assertEquals("s2", header.getMeasurementID());
+ assertEquals("s2", header.getMeasurementID());
}
- Assert.assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
+ assertEquals(MetaMarker.OPERATION_INDEX_RANGE, reader.readMarker());
reader.readPlanIndex();
- Assert.assertEquals(100, reader.getMinPlanIndex());
- Assert.assertEquals(10000, reader.getMaxPlanIndex());
+ assertEquals(100, reader.getMinPlanIndex());
+ assertEquals(10000, reader.getMaxPlanIndex());
- Assert.assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
+ assertEquals(MetaMarker.SEPARATOR, reader.readMarker());
// make sure timeseriesMetadata is only
Map<IDeviceID, List<TimeseriesMetadata>> deviceTimeseriesMetadataMap =
@@ -167,7 +260,7 @@ public class TsFileIOWriterTest {
for (MetadataIndexNode node :
metaData.getTableMetadataIndexNodeMap().values()) {
cnt += node.getChildren().size();
}
- Assert.assertEquals(2, cnt);
+ assertEquals(2, cnt);
}
private void writeChunkGroup(TsFileIOWriter writer, IMeasurementSchema
measurementSchema)
@@ -200,7 +293,7 @@ public class TsFileIOWriterTest {
// vector chunk (time)
writer.startFlushChunk(
vectorMeasurementSchema.getMeasurementName(),
- vectorMeasurementSchema.getCompressor(),
+ vectorMeasurementSchema.getTimeCompressor(),
vectorMeasurementSchema.getType(),
vectorMeasurementSchema.getTimeTSEncoding(),
Statistics.getStatsByType(vectorMeasurementSchema.getType()),
@@ -216,7 +309,7 @@ public class TsFileIOWriterTest {
subStatistics.updateStats(0L, 0L);
writer.startFlushChunk(
vectorMeasurementSchema.getSubMeasurementsList().get(j),
- vectorMeasurementSchema.getCompressor(),
+ vectorMeasurementSchema.getValueCompressor(j),
vectorMeasurementSchema.getSubMeasurementsTSDataTypeList().get(j),
vectorMeasurementSchema.getSubMeasurementsTSEncodingList().get(j),
subStatistics,