This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 803b0cb7 Replace value decoder (#655)
803b0cb7 is described below
commit 803b0cb7a076a31b7a24e6719251949fb7789693
Author: shuwenwei <[email protected]>
AuthorDate: Mon Dec 8 10:10:53 2025 +0800
Replace value decoder (#655)
---
.../tsfile/encoding/decoder/DecoderWrapper.java | 86 +++++++++++
.../org/apache/tsfile/file/header/ChunkHeader.java | 20 +++
.../apache/tsfile/read/TsFileSequenceReader.java | 1 +
.../reader/chunk/AbstractAlignedChunkReader.java | 4 +-
.../tsfile/read/reader/chunk/ChunkReader.java | 3 +-
.../tsfile/read/reader/ReplaceDecoderTest.java | 157 +++++++++++++++++++++
6 files changed, 266 insertions(+), 5 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/DecoderWrapper.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/DecoderWrapper.java
new file mode 100644
index 00000000..516589b8
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/DecoderWrapper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.utils.Binary;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+public abstract class DecoderWrapper extends Decoder {
+
+ protected final Decoder originDecoder;
+
+ public DecoderWrapper(Decoder originDecoder) {
+ super(originDecoder.getType());
+ this.originDecoder = originDecoder;
+ }
+
+ @Override
+ public boolean hasNext(ByteBuffer buffer) throws IOException {
+ return originDecoder.hasNext(buffer);
+ }
+
+ @Override
+ public void reset() {
+ originDecoder.reset();
+ }
+
+ @Override
+ public int readInt(ByteBuffer buffer) {
+ return originDecoder.readInt(buffer);
+ }
+
+ @Override
+ public boolean readBoolean(ByteBuffer buffer) {
+ return originDecoder.readBoolean(buffer);
+ }
+
+ @Override
+ public short readShort(ByteBuffer buffer) {
+ return originDecoder.readShort(buffer);
+ }
+
+ @Override
+ public long readLong(ByteBuffer buffer) {
+ return originDecoder.readLong(buffer);
+ }
+
+ @Override
+ public float readFloat(ByteBuffer buffer) {
+ return originDecoder.readFloat(buffer);
+ }
+
+ @Override
+ public double readDouble(ByteBuffer buffer) {
+ return originDecoder.readDouble(buffer);
+ }
+
+ @Override
+ public Binary readBinary(ByteBuffer buffer) {
+ return originDecoder.readBinary(buffer);
+ }
+
+ @Override
+ public BigDecimal readBigDecimal(ByteBuffer buffer) {
+ return originDecoder.readBigDecimal(buffer);
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
index c07071fb..c3a29d2f 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
@@ -20,6 +20,8 @@
package org.apache.tsfile.file.header;
import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.encoding.decoder.DecoderWrapper;
import org.apache.tsfile.enums.TSDataType;
import org.apache.tsfile.file.MetaMarker;
import org.apache.tsfile.file.metadata.TimeseriesMetadata;
@@ -36,6 +38,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.util.function.Function;
import java.util.function.LongConsumer;
public class ChunkHeader {
@@ -64,6 +67,7 @@ public class ChunkHeader {
// the following fields do not need to be serialized.
private int numOfPages;
private final int serializedSize;
+ private Function<Decoder, DecoderWrapper> replaceDecoder;
public ChunkHeader(
String measurementID,
@@ -277,6 +281,22 @@ public class ChunkHeader {
return dataType;
}
+ public Decoder calculateDecoderForNonTimeChunk() {
+ Decoder decoder = Decoder.getDecoderByType(encodingType, dataType);
+ return replaceDecoder == null ? decoder : replaceDecoder.apply(decoder);
+ }
+
+ public Decoder replaceDecoder(Decoder decoder) {
+ if (replaceDecoder == null) {
+ return decoder;
+ }
+ return replaceDecoder.apply(decoder);
+ }
+
+ public void setReplaceDecoder(Function<Decoder, DecoderWrapper>
replaceDecoder) {
+ this.replaceDecoder = replaceDecoder;
+ }
+
/**
* serialize to outputStream.
*
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index da8d0169..132c6f34 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -1671,6 +1671,7 @@ public class TsFileSequenceReader implements
AutoCloseable {
private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(
IDeviceID device, boolean needChunkMetadata) throws IOException {
+ readFileMetadata();
MetadataIndexNode metadataIndexNode =
tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
Pair<IMetadataIndexEntry, Long> metadataIndexPair =
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
index 6005bb0b..22d74353 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
@@ -229,9 +229,7 @@ public abstract class AbstractAlignedChunkReader extends
AbstractChunkReader {
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()),
encryptParam);
valueDataTypeList.add(valueChunkHeader.getDataType());
- valueDecoderList.add(
- Decoder.getDecoderByType(
- valueChunkHeader.getEncodingType(),
valueChunkHeader.getDataType()));
+
valueDecoderList.add(valueChunkHeader.calculateDecoderForNonTimeChunk());
isAllNull = false;
}
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
index 3dd5aacd..806a3b81 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
@@ -20,7 +20,6 @@
package org.apache.tsfile.read.reader.chunk;
import org.apache.tsfile.compress.IUnCompressor;
-import org.apache.tsfile.encoding.decoder.Decoder;
import org.apache.tsfile.encrypt.EncryptParameter;
import org.apache.tsfile.encrypt.IDecryptor;
import org.apache.tsfile.file.MetaMarker;
@@ -141,7 +140,7 @@ public class ChunkReader extends AbstractChunkReader {
new LazyLoadPageData(
chunkDataBuffer.array(), currentPagePosition, unCompressor,
encryptParam),
chunkHeader.getDataType(),
- Decoder.getDecoderByType(chunkHeader.getEncodingType(),
chunkHeader.getDataType()),
+ chunkHeader.calculateDecoderForNonTimeChunk(),
defaultTimeDecoder,
queryFilter);
reader.setDeleteIntervalList(deleteIntervalList);
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/ReplaceDecoderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/ReplaceDecoderTest.java
new file mode 100644
index 00000000..b88dc4a2
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/ReplaceDecoderTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader;
+
+import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.encoding.decoder.DecoderWrapper;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class ReplaceDecoderTest {
+
+ private static final String FILE_PATH =
+ TestConstant.BASE_OUTPUT_PATH.concat("ReplaceDecoder.tsfile");
+
+ @After
+ public void teardown() {
+ new File(FILE_PATH).delete();
+ }
+
+ @Test
+ public void testNonAligned() throws IOException, WriteProcessException {
+ IDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
+ try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) {
+ writer.registerTimeseries(deviceID, new MeasurementSchema("s1",
TSDataType.INT32));
+ Tablet tablet =
+ new Tablet(
+ deviceID.toString(),
+ Collections.singletonList(new MeasurementSchema("s1",
TSDataType.INT32)));
+ for (int i = 0; i < 10; i++) {
+ tablet.addTimestamp(i, i);
+ tablet.addValue(i, 0, i);
+ }
+ writer.writeTree(tablet);
+ writer.flush();
+ }
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ TimeseriesMetadata timeseriesMetadata =
reader.getDeviceTimeseriesMetadata(deviceID).get(0);
+ for (IChunkMetadata iChunkMetadata :
timeseriesMetadata.getChunkMetadataList()) {
+ Chunk chunk = reader.readMemChunk((ChunkMetadata) iChunkMetadata);
+ chunk
+ .getHeader()
+ .setReplaceDecoder(
+ decoder ->
+ new DecoderWrapper(decoder) {
+ @Override
+ public int readInt(ByteBuffer buffer) {
+ return decoder.readInt(buffer) + 10;
+ }
+ });
+ ChunkReader chunkReader = new ChunkReader(chunk);
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
+ IPointReader pointReader = batchData.getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+ Assert.assertEquals(
+ (int) timeValuePair.getTimestamp(), (Integer)
timeValuePair.getValues()[0] - 10);
+ }
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testAligned() throws IOException, WriteProcessException {
+ IDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
+ try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) {
+ writer.registerAlignedTimeseries(
+ deviceID, Collections.singletonList(new MeasurementSchema("s1",
TSDataType.INT32)));
+ Tablet tablet =
+ new Tablet(
+ deviceID.toString(),
+ Collections.singletonList(new MeasurementSchema("s1",
TSDataType.INT32)));
+ for (int i = 0; i < 10; i++) {
+ tablet.addTimestamp(i, i);
+ tablet.addValue(i, 0, i);
+ }
+ writer.writeTree(tablet);
+ writer.flush();
+ }
+ try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+ List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+ reader.getAlignedChunkMetadata(deviceID, true);
+
+ for (AbstractAlignedChunkMetadata alignedChunkMetadata :
alignedChunkMetadataList) {
+ Chunk timeChunk =
+ reader.readMemChunk((ChunkMetadata)
alignedChunkMetadata.getTimeChunkMetadata());
+ Chunk valueChunk =
+ reader.readMemChunk(
+ (ChunkMetadata)
alignedChunkMetadata.getValueChunkMetadataList().get(0));
+ valueChunk
+ .getHeader()
+ .setReplaceDecoder(
+ decoder ->
+ new DecoderWrapper(decoder) {
+ @Override
+ public int readInt(ByteBuffer buffer) {
+ return decoder.readInt(buffer) + 10;
+ }
+ });
+ AlignedChunkReader chunkReader =
+ new AlignedChunkReader(timeChunk,
Collections.singletonList(valueChunk));
+ while (chunkReader.hasNextSatisfiedPage()) {
+ BatchData batchData = chunkReader.nextPageData();
+ IPointReader pointReader = batchData.getBatchDataIterator();
+ while (pointReader.hasNextTimeValuePair()) {
+ TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+ Assert.assertEquals(
+ (int) timeValuePair.getTimestamp(), (Integer)
timeValuePair.getValues()[0] - 10);
+ }
+ }
+ }
+ }
+ }
+}