This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 803b0cb7 Replace value decoder (#655)
803b0cb7 is described below

commit 803b0cb7a076a31b7a24e6719251949fb7789693
Author: shuwenwei <[email protected]>
AuthorDate: Mon Dec 8 10:10:53 2025 +0800

    Replace value decoder (#655)
---
 .../tsfile/encoding/decoder/DecoderWrapper.java    |  86 +++++++++++
 .../org/apache/tsfile/file/header/ChunkHeader.java |  20 +++
 .../apache/tsfile/read/TsFileSequenceReader.java   |   1 +
 .../reader/chunk/AbstractAlignedChunkReader.java   |   4 +-
 .../tsfile/read/reader/chunk/ChunkReader.java      |   3 +-
 .../tsfile/read/reader/ReplaceDecoderTest.java     | 157 +++++++++++++++++++++
 6 files changed, 266 insertions(+), 5 deletions(-)

diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/DecoderWrapper.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/DecoderWrapper.java
new file mode 100644
index 00000000..516589b8
--- /dev/null
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/DecoderWrapper.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.utils.Binary;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+
+public abstract class DecoderWrapper extends Decoder {
+
+  protected final Decoder originDecoder;
+
+  public DecoderWrapper(Decoder originDecoder) {
+    super(originDecoder.getType());
+    this.originDecoder = originDecoder;
+  }
+
+  @Override
+  public boolean hasNext(ByteBuffer buffer) throws IOException {
+    return originDecoder.hasNext(buffer);
+  }
+
+  @Override
+  public void reset() {
+    originDecoder.reset();
+  }
+
+  @Override
+  public int readInt(ByteBuffer buffer) {
+    return originDecoder.readInt(buffer);
+  }
+
+  @Override
+  public boolean readBoolean(ByteBuffer buffer) {
+    return originDecoder.readBoolean(buffer);
+  }
+
+  @Override
+  public short readShort(ByteBuffer buffer) {
+    return originDecoder.readShort(buffer);
+  }
+
+  @Override
+  public long readLong(ByteBuffer buffer) {
+    return originDecoder.readLong(buffer);
+  }
+
+  @Override
+  public float readFloat(ByteBuffer buffer) {
+    return originDecoder.readFloat(buffer);
+  }
+
+  @Override
+  public double readDouble(ByteBuffer buffer) {
+    return originDecoder.readDouble(buffer);
+  }
+
+  @Override
+  public Binary readBinary(ByteBuffer buffer) {
+    return originDecoder.readBinary(buffer);
+  }
+
+  @Override
+  public BigDecimal readBigDecimal(ByteBuffer buffer) {
+    return originDecoder.readBigDecimal(buffer);
+  }
+}
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
index c07071fb..c3a29d2f 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/file/header/ChunkHeader.java
@@ -20,6 +20,8 @@
 package org.apache.tsfile.file.header;
 
 import org.apache.tsfile.common.conf.TSFileConfig;
+import org.apache.tsfile.encoding.decoder.Decoder;
+import org.apache.tsfile.encoding.decoder.DecoderWrapper;
 import org.apache.tsfile.enums.TSDataType;
 import org.apache.tsfile.file.MetaMarker;
 import org.apache.tsfile.file.metadata.TimeseriesMetadata;
@@ -36,6 +38,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
+import java.util.function.Function;
 import java.util.function.LongConsumer;
 
 public class ChunkHeader {
@@ -64,6 +67,7 @@ public class ChunkHeader {
   // the following fields do not need to be serialized.
   private int numOfPages;
   private final int serializedSize;
+  private Function<Decoder, DecoderWrapper> replaceDecoder;
 
   public ChunkHeader(
       String measurementID,
@@ -277,6 +281,22 @@ public class ChunkHeader {
     return dataType;
   }
 
+  public Decoder calculateDecoderForNonTimeChunk() {
+    Decoder decoder = Decoder.getDecoderByType(encodingType, dataType);
+    return replaceDecoder == null ? decoder : replaceDecoder.apply(decoder);
+  }
+
+  public Decoder replaceDecoder(Decoder decoder) {
+    if (replaceDecoder == null) {
+      return decoder;
+    }
+    return replaceDecoder.apply(decoder);
+  }
+
+  public void setReplaceDecoder(Function<Decoder, DecoderWrapper> 
replaceDecoder) {
+    this.replaceDecoder = replaceDecoder;
+  }
+
   /**
    * serialize to outputStream.
    *
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java 
b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
index da8d0169..132c6f34 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/read/TsFileSequenceReader.java
@@ -1671,6 +1671,7 @@ public class TsFileSequenceReader implements 
AutoCloseable {
 
   private List<TimeseriesMetadata> getDeviceTimeseriesMetadata(
       IDeviceID device, boolean needChunkMetadata) throws IOException {
+    readFileMetadata();
     MetadataIndexNode metadataIndexNode =
         tsFileMetaData.getTableMetadataIndexNode(device.getTableName());
     Pair<IMetadataIndexEntry, Long> metadataIndexPair =
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
index 6005bb0b..22d74353 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/AbstractAlignedChunkReader.java
@@ -229,9 +229,7 @@ public abstract class AbstractAlignedChunkReader extends 
AbstractChunkReader {
                 
IUnCompressor.getUnCompressor(valueChunkHeader.getCompressionType()),
                 encryptParam);
         valueDataTypeList.add(valueChunkHeader.getDataType());
-        valueDecoderList.add(
-            Decoder.getDecoderByType(
-                valueChunkHeader.getEncodingType(), 
valueChunkHeader.getDataType()));
+        
valueDecoderList.add(valueChunkHeader.calculateDecoderForNonTimeChunk());
         isAllNull = false;
       }
     }
diff --git 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
index 3dd5aacd..806a3b81 100644
--- 
a/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
+++ 
b/java/tsfile/src/main/java/org/apache/tsfile/read/reader/chunk/ChunkReader.java
@@ -20,7 +20,6 @@
 package org.apache.tsfile.read.reader.chunk;
 
 import org.apache.tsfile.compress.IUnCompressor;
-import org.apache.tsfile.encoding.decoder.Decoder;
 import org.apache.tsfile.encrypt.EncryptParameter;
 import org.apache.tsfile.encrypt.IDecryptor;
 import org.apache.tsfile.file.MetaMarker;
@@ -141,7 +140,7 @@ public class ChunkReader extends AbstractChunkReader {
             new LazyLoadPageData(
                 chunkDataBuffer.array(), currentPagePosition, unCompressor, 
encryptParam),
             chunkHeader.getDataType(),
-            Decoder.getDecoderByType(chunkHeader.getEncodingType(), 
chunkHeader.getDataType()),
+            chunkHeader.calculateDecoderForNonTimeChunk(),
             defaultTimeDecoder,
             queryFilter);
     reader.setDeleteIntervalList(deleteIntervalList);
diff --git 
a/java/tsfile/src/test/java/org/apache/tsfile/read/reader/ReplaceDecoderTest.java
 
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/ReplaceDecoderTest.java
new file mode 100644
index 00000000..b88dc4a2
--- /dev/null
+++ 
b/java/tsfile/src/test/java/org/apache/tsfile/read/reader/ReplaceDecoderTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.read.reader;
+
+import org.apache.tsfile.constant.TestConstant;
+import org.apache.tsfile.encoding.decoder.DecoderWrapper;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.exception.write.WriteProcessException;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.ChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.StringArrayDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TimeValuePair;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.BatchData;
+import org.apache.tsfile.read.common.Chunk;
+import org.apache.tsfile.read.reader.chunk.AlignedChunkReader;
+import org.apache.tsfile.read.reader.chunk.ChunkReader;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+
+public class ReplaceDecoderTest {
+
+  private static final String FILE_PATH =
+      TestConstant.BASE_OUTPUT_PATH.concat("ReplaceDecoder.tsfile");
+
+  @After
+  public void teardown() {
+    new File(FILE_PATH).delete();
+  }
+
+  @Test
+  public void testNonAligned() throws IOException, WriteProcessException {
+    IDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
+    try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) {
+      writer.registerTimeseries(deviceID, new MeasurementSchema("s1", 
TSDataType.INT32));
+      Tablet tablet =
+          new Tablet(
+              deviceID.toString(),
+              Collections.singletonList(new MeasurementSchema("s1", 
TSDataType.INT32)));
+      for (int i = 0; i < 10; i++) {
+        tablet.addTimestamp(i, i);
+        tablet.addValue(i, 0, i);
+      }
+      writer.writeTree(tablet);
+      writer.flush();
+    }
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+      TimeseriesMetadata timeseriesMetadata = 
reader.getDeviceTimeseriesMetadata(deviceID).get(0);
+      for (IChunkMetadata iChunkMetadata : 
timeseriesMetadata.getChunkMetadataList()) {
+        Chunk chunk = reader.readMemChunk((ChunkMetadata) iChunkMetadata);
+        chunk
+            .getHeader()
+            .setReplaceDecoder(
+                decoder ->
+                    new DecoderWrapper(decoder) {
+                      @Override
+                      public int readInt(ByteBuffer buffer) {
+                        return decoder.readInt(buffer) + 10;
+                      }
+                    });
+        ChunkReader chunkReader = new ChunkReader(chunk);
+        while (chunkReader.hasNextSatisfiedPage()) {
+          BatchData batchData = chunkReader.nextPageData();
+          IPointReader pointReader = batchData.getBatchDataIterator();
+          while (pointReader.hasNextTimeValuePair()) {
+            TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+            Assert.assertEquals(
+                (int) timeValuePair.getTimestamp(), (Integer) 
timeValuePair.getValues()[0] - 10);
+          }
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAligned() throws IOException, WriteProcessException {
+    IDeviceID deviceID = new StringArrayDeviceID("root.test.d1");
+    try (TsFileWriter writer = new TsFileWriter(new File(FILE_PATH))) {
+      writer.registerAlignedTimeseries(
+          deviceID, Collections.singletonList(new MeasurementSchema("s1", 
TSDataType.INT32)));
+      Tablet tablet =
+          new Tablet(
+              deviceID.toString(),
+              Collections.singletonList(new MeasurementSchema("s1", 
TSDataType.INT32)));
+      for (int i = 0; i < 10; i++) {
+        tablet.addTimestamp(i, i);
+        tablet.addValue(i, 0, i);
+      }
+      writer.writeTree(tablet);
+      writer.flush();
+    }
+    try (TsFileSequenceReader reader = new TsFileSequenceReader(FILE_PATH)) {
+      List<AbstractAlignedChunkMetadata> alignedChunkMetadataList =
+          reader.getAlignedChunkMetadata(deviceID, true);
+
+      for (AbstractAlignedChunkMetadata alignedChunkMetadata : 
alignedChunkMetadataList) {
+        Chunk timeChunk =
+            reader.readMemChunk((ChunkMetadata) 
alignedChunkMetadata.getTimeChunkMetadata());
+        Chunk valueChunk =
+            reader.readMemChunk(
+                (ChunkMetadata) 
alignedChunkMetadata.getValueChunkMetadataList().get(0));
+        valueChunk
+            .getHeader()
+            .setReplaceDecoder(
+                decoder ->
+                    new DecoderWrapper(decoder) {
+                      @Override
+                      public int readInt(ByteBuffer buffer) {
+                        return decoder.readInt(buffer) + 10;
+                      }
+                    });
+        AlignedChunkReader chunkReader =
+            new AlignedChunkReader(timeChunk, 
Collections.singletonList(valueChunk));
+        while (chunkReader.hasNextSatisfiedPage()) {
+          BatchData batchData = chunkReader.nextPageData();
+          IPointReader pointReader = batchData.getBatchDataIterator();
+          while (pointReader.hasNextTimeValuePair()) {
+            TimeValuePair timeValuePair = pointReader.nextTimeValuePair();
+            Assert.assertEquals(
+                (int) timeValuePair.getTimestamp(), (Integer) 
timeValuePair.getValues()[0] - 10);
+          }
+        }
+      }
+    }
+  }
+}

Reply via email to