This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 95dbf6a2 Feature/codec/camel (#560)
95dbf6a2 is described below
commit 95dbf6a25ef03886c6547a9383dcb63b6f8ca650
Author: Hongzhi Gao <[email protected]>
AuthorDate: Mon Aug 18 14:20:24 2025 +0800
Feature/codec/camel (#560)
* implement camel codec
* implement basic camel
* implement simple gorilla codec
* tmp
* implement camel codec
* spotless apply
* fix some code notes
* adjusted camel codec interface
* implement tsfile-camel
* implement tsfile-camel
* add license
* upgrade commons-lang3 for CVE‑2025‑48924
* Avoid using Double
* fix camel decoder
* Optimize the getValues() interface to reuse the returned double[] array
* removed unused variables
---
.../tsfile/common/bitStream/BitInputStream.java | 264 ++++++++++++++++
.../tsfile/common/bitStream/BitOutputStream.java | 196 ++++++++++++
.../apache/tsfile/common/bitStream/BitStream.java | 37 +++
.../bitStream/ByteBufferBackedInputStream.java | 51 +++
.../tsfile/encoding/decoder/CamelDecoder.java | 269 ++++++++++++++++
.../apache/tsfile/encoding/decoder/Decoder.java | 7 +
.../tsfile/encoding/encoder/CamelEncoder.java | 297 ++++++++++++++++++
.../tsfile/encoding/encoder/TSEncodingBuilder.java | 21 ++
.../tsfile/file/metadata/enums/TSEncoding.java | 6 +-
.../tsfile/common/bitStream/TestBitStream.java | 346 +++++++++++++++++++++
.../tsfile/encoding/decoder/CamelDecoderTest.java | 281 +++++++++++++++++
.../apache/tsfile/write/TsFileReadWriteTest.java | 3 +-
12 files changed, 1776 insertions(+), 2 deletions(-)
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitInputStream.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitInputStream.java
new file mode 100644
index 00000000..4da6c8f3
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitInputStream.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.common.bitStream;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+
+/** A stream for reading individual bits or groups of bits from an
InputStream. */
+public class BitInputStream extends BitStream {
+
+ protected InputStream in;
+ protected int buffer;
+ protected int bufferBitCount;
+ protected final long totalBits; // Total valid bits
+ protected long bitsRead = 0; // Number of bits read so far
+
+ protected int markedBuffer = 0;
+ protected int markedBufferBitCount = 0;
+ protected long markedBitsRead = 0;
+
+ /**
+ * Constructs a BitInputStream with a given InputStream and total number of
valid bits.
+ *
+ * @param in the underlying InputStream
+ * @param totalBits the total number of valid bits to read
+ */
+ public BitInputStream(InputStream in, long totalBits) {
+ this.in = in;
+ this.totalBits = totalBits;
+ this.bufferBitCount = 0;
+ }
+
+ /**
+ * Reads an integer value using the specified number of bits. If fewer bits
are available, only
+ * the available bits are returned.
+ *
+ * @param numBits the number of bits to read (≤ 32)
+ * @return an integer whose lower bits contain the read value
+ * @throws EOFException if no data is available to read
+ * @throws IOException if an I/O error occurs
+ */
+ public int readInt(int numBits) throws IOException {
+ if (availableBits() <= 0) {
+ throw new EOFException();
+ }
+
+ bitsRead += numBits;
+ int result = 0;
+ boolean hasReadData = false;
+
+ while (numBits > 0) {
+ if (bufferBitCount == 0) {
+ buffer = in.read();
+ if (buffer < 0) {
+ if (!hasReadData) {
+ throw new EOFException();
+ }
+ return result;
+ }
+ bufferBitCount = BITS_PER_BYTE;
+ }
+
+ if (bufferBitCount > numBits) {
+ result = ((buffer >> (bufferBitCount - numBits)) & MASKS[numBits]) |
result;
+ bufferBitCount -= numBits;
+ numBits = 0;
+ } else {
+ result = ((buffer & MASKS[bufferBitCount]) << (numBits -
bufferBitCount)) | result;
+ numBits -= bufferBitCount;
+ bufferBitCount = 0;
+ }
+
+ hasReadData = true;
+ }
+
+ return result;
+ }
+
+ /**
+ * Reads a long value using the specified number of bits.
+ *
+ * @param numBits the number of bits to read (0 to 64)
+ * @return a long value containing the read bits
+ * @throws EOFException if no data is available to read
+ * @throws IOException if an I/O error occurs
+ */
+ public long readLong(int numBits) throws IOException {
+ if (availableBits() <= 0) {
+ throw new EOFException();
+ }
+ bitsRead += numBits;
+ if (numBits > 64 || numBits < 0) {
+ throw new IllegalArgumentException("numBits must be between 0 and 64");
+ }
+
+ long result = 0;
+ boolean hasReadData = false;
+
+ while (numBits > 0) {
+ if (bufferBitCount == 0) {
+ buffer = in.read();
+ if (buffer < 0) {
+ if (!hasReadData) {
+ throw new EOFException();
+ }
+ return result;
+ }
+ bufferBitCount = BITS_PER_BYTE;
+ }
+
+ if (bufferBitCount > numBits) {
+ int shift = bufferBitCount - numBits;
+ result = (result << numBits) | ((buffer >> shift) & MASKS[numBits]);
+ bufferBitCount -= numBits;
+ buffer &= MASKS[bufferBitCount];
+ numBits = 0;
+ } else {
+ result = (result << bufferBitCount) | (buffer & MASKS[bufferBitCount]);
+ numBits -= bufferBitCount;
+ bufferBitCount = 0;
+ }
+
+ hasReadData = true;
+ }
+
+ return result;
+ }
+
+ public static int readVarInt(BitInputStream in) throws IOException {
+ int result = 0;
+ int shift = 0;
+
+ while (true) {
+ int chunk = in.readInt(7);
+ boolean hasNext = in.readBit();
+ result |= chunk << shift;
+ if (!hasNext) break;
+ shift += 7;
+ if (shift >= 32) throw new IOException("VarInt too long");
+ }
+
+ return (result >>> 1) ^ -(result & 1);
+ }
+
+ public static long readVarLong(BitInputStream in) throws IOException {
+ long result = 0;
+ int shift = 0;
+
+ while (true) {
+ long chunk = in.readInt(7);
+ boolean hasNext = in.readBit();
+
+ result |= (chunk) << shift;
+ shift += 7;
+
+ if (!hasNext) {
+ break;
+ }
+
+ if (shift >= 64) {
+ throw new IOException("VarLong too long: overflow");
+ }
+ }
+
+ // ZigZag 解码
+ return (result >>> 1) ^ -(result & 1);
+ }
+
+ /**
+ * Reads a single bit from the stream.
+ *
+ * @return true if the bit is 1, false if it is 0
+ * @throws EOFException if no bits are available
+ * @throws IOException if an I/O error occurs
+ */
+ public boolean readBit() throws IOException {
+ if (availableBits() <= 0) {
+ throw new EOFException();
+ }
+ bitsRead += 1;
+ if (bufferBitCount == 0) {
+ buffer = in.read();
+ if (buffer < 0) {
+ throw new EOFException();
+ }
+ bufferBitCount = BITS_PER_BYTE;
+ }
+
+ boolean bit = ((buffer >> (bufferBitCount - 1)) & 1) != 0;
+ bufferBitCount--;
+ return bit;
+ }
+
+ /**
+ * Returns whether this stream supports mark/reset.
+ *
+ * @return true if mark/reset is supported
+ */
+ public boolean markSupported() {
+ return in.markSupported();
+ }
+
+ /**
+ * Marks the current position in the stream.
+ *
+ * @param readLimit the maximum number of bits that can be read before the
mark becomes invalid
+ */
+ public void mark(int readLimit) {
+ in.mark((readLimit + BITS_PER_BYTE - 1) / BITS_PER_BYTE);
+ markedBuffer = buffer;
+ markedBufferBitCount = bufferBitCount;
+ markedBitsRead = bitsRead;
+ }
+
+ /**
+ * Resets the stream to the most recent marked position.
+ *
+ * @throws IOException if mark was not called or has been invalidated
+ */
+ public void reset() throws IOException {
+ in.reset();
+ buffer = markedBuffer;
+ bufferBitCount = markedBufferBitCount;
+ bitsRead = markedBitsRead;
+ }
+
+ /**
+ * Returns the number of bits still available to read.
+ *
+ * @return the number of remaining available bits
+ * @throws IOException if an I/O error occurs
+ */
+ public int availableBits() throws IOException {
+ return (int)
+ Math.min(((long) in.available() * BITS_PER_BYTE) + bufferBitCount,
totalBits - bitsRead);
+ }
+
+ /**
+ * Closes the underlying InputStream.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void close() throws IOException {
+ in.close();
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitOutputStream.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitOutputStream.java
new file mode 100644
index 00000000..641ccd53
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitOutputStream.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.common.bitStream;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * A bit-level output stream that writes bits to an underlying byte-oriented
OutputStream. Bits are
+ * written in MSB-first (most significant bit first) order within each byte.
+ */
+public class BitOutputStream extends BitStream {
+
+ protected OutputStream out;
+ protected int buffer; // Bit buffer (8-bit)
+ protected int bufferBitCount; // Number of bits currently in the buffer
+
+ protected int bitsWritten; // Total number of bits written
+
+ /**
+ * Constructs a BitOutputStream from the given OutputStream.
+ *
+ * @param out the underlying OutputStream
+ */
+ public BitOutputStream(OutputStream out) {
+ this.out = out;
+ this.buffer = 0;
+ this.bufferBitCount = 0;
+ this.bitsWritten = 0;
+ }
+
+ public void reset(OutputStream out) {
+ this.out = out;
+ this.buffer = 0;
+ this.bufferBitCount = 0;
+ this.bitsWritten = 0;
+ }
+
+ /**
+ * Writes the specified number of bits from the given integer. Bits are
taken from the lower bits
+ * of the data and written MSB-first.
+ *
+ * @param data the data to write (bits from the LSB end)
+ * @param numBits number of bits to write (0–32)
+ * @throws IOException if an I/O error occurs
+ */
+ public void writeInt(int data, int numBits) throws IOException {
+ bitsWritten += numBits;
+ while (numBits > 0) {
+ int rest = 8 - bufferBitCount;
+
+ if (rest > numBits) {
+ buffer |= ((data & MASKS[numBits]) << (rest - numBits));
+ bufferBitCount += numBits;
+ numBits = 0;
+ } else {
+ buffer |= ((data >> (numBits - rest)) & MASKS[rest]);
+ out.write(buffer);
+ buffer = 0;
+ bufferBitCount = 0;
+ numBits -= rest;
+ }
+ }
+ }
+
+ /**
+ * Writes the specified number of bits from the given long value. Bits are
taken from the lower
+ * bits of the data and written MSB-first.
+ *
+ * @param data the data to write (bits from the LSB end)
+ * @param numBits number of bits to write (0–64)
+ * @throws IOException if an I/O error occurs
+ */
+ public void writeLong(long data, int numBits) throws IOException {
+ if (numBits > 64 || numBits < 0) {
+ throw new IllegalArgumentException("numBits must be between 0 and 64");
+ }
+
+ bitsWritten += numBits;
+ while (numBits > 0) {
+ int rest = 8 - bufferBitCount;
+
+ if (rest > numBits) {
+ int shift = rest - numBits;
+ int toWrite = (int) ((data & MASKS[numBits]) << shift);
+ buffer |= toWrite;
+ bufferBitCount += numBits;
+ numBits = 0;
+ } else {
+ int shift = numBits - rest;
+ int toWrite = (int) ((data >> shift) & MASKS[rest]);
+ buffer |= toWrite;
+ out.write(buffer);
+ buffer = 0;
+ bufferBitCount = 0;
+ numBits -= rest;
+ }
+ }
+ }
+
+ public static int writeVarInt(int value, BitOutputStream out) throws
IOException {
+ int uValue =
+ (value << 1) ^ (value >> 31); // ZigZag encoding: even for positive,
odd for negative
+ int bits = 0;
+
+ while ((uValue & ~0x7F) != 0) {
+ out.writeInt(uValue & 0x7F, 7); // Write lower 7 bits
+ out.writeBit(true); // Continuation flag 1
+ uValue >>>= 7;
+ bits += 8;
+ }
+
+ out.writeInt(uValue, 7); // Last 7 bits
+ out.writeBit(false); // Termination flag 0
+ bits += 8;
+
+ return bits;
+ }
+
+ public static int writeVarLong(long value, BitOutputStream out) throws
IOException {
+ long uValue =
+ (value << 1) ^ (value >> 63); // ZigZag encoding: even for positive,
odd for negative
+ int bitsWritten = 0;
+
+ while ((uValue & ~0x7FL) != 0) {
+ int chunk = (int) (uValue & 0x7F); // Lower 7 bits
+ out.writeInt(chunk, 7); // Write data bits
+ out.writeBit(true); // Has more data
+ uValue >>>= 7;
+ bitsWritten += 8;
+ }
+
+ out.writeInt((int) (uValue & 0x7F), 7); // Last byte
+ out.writeBit(false); // End flag
+ bitsWritten += 8;
+ return bitsWritten;
+ }
+
+ /**
+ * Writes a single bit. The bit is stored in the buffer until a full byte is
collected.
+ *
+ * @param bit true to write 1, false to write 0
+ * @throws IOException if an I/O error occurs
+ */
+ public void writeBit(boolean bit) throws IOException {
+ bitsWritten += 1;
+
+ buffer |= (bit ? 1 : 0) << (7 - bufferBitCount);
+ bufferBitCount++;
+
+ if (bufferBitCount == 8) {
+ out.write(buffer);
+ buffer = 0;
+ bufferBitCount = 0;
+ }
+ }
+
+ /**
+ * Flushes the remaining bits in the buffer to the stream (if any), padding
the remaining bits
+ * with zeros in the lower positions.
+ *
+ * @throws IOException if an I/O error occurs
+ */
+ public void close() throws IOException {
+ if (bufferBitCount > 0) {
+ out.write(buffer);
+ }
+ out.close();
+ }
+
+ /**
+ * Returns the total number of bits written so far.
+ *
+ * @return the number of bits written
+ */
+ public int getBitsWritten() {
+ return bitsWritten;
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitStream.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitStream.java
new file mode 100644
index 00000000..83720e3f
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/BitStream.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.common.bitStream;
+
+/**
+ * Base class for bit-level stream operations. Provides shared constants and
bit masks for bitwise
+ * manipulation.
+ */
+public class BitStream {
+
+ /** Number of bits per byte (always 8) */
+ protected static final int BITS_PER_BYTE = 8;
+
+ /**
+ * Bit masks used to extract the lowest N bits of a value. MASKS[n] contains
a bitmask with the
+ * lowest n bits set to 1. For example: MASKS[0] = 0b00000000 MASKS[1] =
0b00000001 MASKS[2] =
+ * 0b00000011 ... MASKS[8] = 0b11111111
+ */
+ protected static final int[] MASKS = new int[] {0, 1, 3, 7, 0xf, 0x1f, 0x3f,
0x7f, 0xff};
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/ByteBufferBackedInputStream.java
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/ByteBufferBackedInputStream.java
new file mode 100644
index 00000000..e8a35aa4
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/common/bitStream/ByteBufferBackedInputStream.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.common.bitStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferBackedInputStream extends InputStream {
+ private final ByteBuffer buf;
+ private final int startPos;
+
+ public ByteBufferBackedInputStream(ByteBuffer buf) {
+ this.buf = buf;
+ this.startPos = buf.position();
+ }
+
+ @Override
+ public int read() throws IOException {
+ if (!buf.hasRemaining()) {
+ return -1;
+ }
+ return buf.get() & 0xFF;
+ }
+
+ @Override
+ public int available() {
+ return buf.remaining();
+ }
+
+ public int getConsumed() {
+ return buf.position() - startPos;
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/CamelDecoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/CamelDecoder.java
new file mode 100644
index 00000000..4744fa78
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/CamelDecoder.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.common.bitStream.BitInputStream;
+import org.apache.tsfile.common.bitStream.ByteBufferBackedInputStream;
+import org.apache.tsfile.exception.encoding.TsFileDecodingException;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+public class CamelDecoder extends Decoder {
+ // === Constants for decoding ===
+ private static final int BITS_FOR_SIGN = 1;
+ private static final int BITS_FOR_TYPE = 1;
+ private static final int BITS_FOR_FIRST_VALUE = 64;
+ private static final int BITS_FOR_LEADING_ZEROS = 6;
+ private static final int BITS_FOR_SIGNIFICANT_BITS = 6;
+ private static final int BITS_FOR_DECIMAL_COUNT = 4;
+ private static final int DOUBLE_TOTAL_BITS = 64;
+ private static final int DOUBLE_MANTISSA_BITS = 52;
+ private static final int DECIMAL_MAX_COUNT = 15;
+
+ // === Camel state ===
+ private long previousValue = 0;
+ private boolean isFirst = true;
+ private long storedVal = 0;
+
+ private double scale;
+
+ // === Precomputed tables ===
+ public static final long[] powers = new long[DECIMAL_MAX_COUNT];
+ public static final long[] threshold = new long[DECIMAL_MAX_COUNT];
+
+ static {
+ for (int l = 1; l <= DECIMAL_MAX_COUNT; l++) {
+ int idx = l - 1;
+ powers[idx] = (long) Math.pow(10, l);
+ long divisor = 1L << l;
+ threshold[idx] = powers[idx] / divisor;
+ }
+ }
+
+ private BitInputStream in;
+ private final GorillaDecoder gorillaDecoder;
+
+ public CamelDecoder(InputStream inputStream, long totalBits) {
+ super(TSEncoding.CAMEL);
+ // Initialize bit-level reader and nested Gorilla decoder
+ this.in = new BitInputStream(inputStream, totalBits);
+ this.gorillaDecoder = new GorillaDecoder();
+ }
+
+ public CamelDecoder() {
+ super(TSEncoding.CAMEL);
+ this.gorillaDecoder = new GorillaDecoder();
+ }
+
+ @Override
+ public boolean hasNext(ByteBuffer buffer) throws IOException {
+ if (cacheIndex < cacheSize) {
+ return true;
+ }
+ if (in != null && in.availableBits() > 0) {
+ return true;
+ }
+ return buffer.hasRemaining();
+ }
+
+ @Override
+ public void reset() {
+ this.in = null;
+ this.isFirst = true;
+ this.previousValue = 0L;
+ this.storedVal = 0L;
+ this.gorillaDecoder.leadingZeros = Integer.MAX_VALUE;
+ this.gorillaDecoder.trailingZeros = 0;
+ }
+
+ // Cache for batch decoding
+ private double[] valueCache = new double[0];
+ private int cacheIndex = 0;
+ private int cacheSize = 0;
+
+ // === Added reusable buffer for getValues ===
+ private double[] valuesBuffer = new double[16];
+
+ @Override
+ public double readDouble(ByteBuffer buffer) {
+ try {
+ if (cacheIndex >= cacheSize) {
+ if (in == null || in.availableBits() == 0) {
+ if (!buffer.hasRemaining()) {
+ throw new TsFileDecodingException("No more data to decode");
+ }
+ // read next chunk
+ ByteBuffer slice = buffer.slice();
+ ByteBufferBackedInputStream bais = new
ByteBufferBackedInputStream(slice);
+ int blockBits = ReadWriteForEncodingUtils.readVarInt(bais);
+ this.in = new BitInputStream(bais, blockBits);
+ // reset state
+ this.isFirst = true;
+ this.storedVal = 0L;
+ this.previousValue = 0L;
+ this.gorillaDecoder.leadingZeros = Integer.MAX_VALUE;
+ this.gorillaDecoder.trailingZeros = 0;
+ // decode current block
+ double[] newValues = getValues();
+ if (newValues.length == 0) {
+ throw new TsFileDecodingException("Unexpected empty block");
+ }
+ valueCache = newValues;
+ cacheSize = newValues.length;
+ cacheIndex = 0;
+ int consumed = bais.getConsumed();
+ buffer.position(buffer.position() + consumed);
+ }
+ }
+ return valueCache[cacheIndex++];
+ } catch (IOException e) {
+ throw new TsFileDecodingException(e.getMessage());
+ }
+ }
+
+ /** Nested class to handle fallback encoding (Gorilla) for double values. */
+ public class GorillaDecoder {
+ private int leadingZeros = Integer.MAX_VALUE;
+ private int trailingZeros = 0;
+
+ /** Decode next value using Gorilla algorithm. */
+ public double decode(BitInputStream in) throws IOException {
+ if (isFirst) {
+ previousValue = in.readLong(BITS_FOR_FIRST_VALUE);
+ isFirst = false;
+ return Double.longBitsToDouble(previousValue);
+ }
+
+ boolean controlBit = in.readBit();
+ if (!controlBit) {
+ return Double.longBitsToDouble(previousValue);
+ }
+
+ boolean reuseBlock = !in.readBit();
+ long xor;
+ if (reuseBlock) {
+ int sigBits = DOUBLE_TOTAL_BITS - leadingZeros - trailingZeros;
+ if (sigBits == 0) {
+ return Double.longBitsToDouble(previousValue);
+ }
+ xor = in.readLong(sigBits) << trailingZeros;
+ } else {
+ leadingZeros = in.readInt(BITS_FOR_LEADING_ZEROS);
+ int sigBits = in.readInt(BITS_FOR_SIGNIFICANT_BITS) + 1;
+ trailingZeros = DOUBLE_TOTAL_BITS - leadingZeros - sigBits;
+ xor = in.readLong(sigBits) << trailingZeros;
+ }
+
+ previousValue ^= xor;
+ return Double.longBitsToDouble(previousValue);
+ }
+ }
+
+ /** Retrieve nested GorillaDecoder. */
+ public GorillaDecoder getGorillaDecoder() {
+ return gorillaDecoder;
+ }
+
+ /** Read all values until the stream is exhausted, reusing valuesBuffer. */
+ public double[] getValues() throws IOException {
+ int count = 0;
+ while (in.availableBits() > 0) {
+ double val = next();
+ if (count == valuesBuffer.length) {
+ valuesBuffer = Arrays.copyOf(valuesBuffer, valuesBuffer.length * 2);
+ }
+ valuesBuffer[count++] = val;
+ }
+ return Arrays.copyOf(valuesBuffer, count);
+ }
+
+ /** Decode next available value, return null if no more bits. */
+ private double next() throws IOException {
+ double result;
+ if (isFirst) {
+ isFirst = false;
+ long firstBits = in.readLong(BITS_FOR_FIRST_VALUE);
+ result = Double.longBitsToDouble(firstBits);
+ storedVal = (long) result;
+ } else {
+ result = nextValue();
+ }
+ previousValue = Double.doubleToLongBits(result);
+ return result;
+ }
+
+ /** Decode according to Camel vs Gorilla path. */
+ private double nextValue() throws IOException {
+ // Read sign bit
+ int signBit = in.readInt(BITS_FOR_SIGN);
+ double sign = signBit == 1 ? -1.0 : 1.0;
+ // Read encoding type bit
+ int typeBit = in.readInt(BITS_FOR_TYPE);
+ boolean useCamel = typeBit == 1;
+
+ if (useCamel) {
+ long intPart = readLong();
+ // decimal = decPart / scale
+ double decPart = readDecimal();
+ double value =
+ (intPart >= 0
+ ? (intPart * scale + decPart) / scale
+ : -(intPart * scale + decPart) / scale);
+ return sign * value;
+ } else {
+ return sign * gorillaDecoder.decode(in);
+ }
+ }
+
+ /** Read variable-length integer diff and update storedVal. */
+ private long readLong() throws IOException {
+ long diff = BitInputStream.readVarLong(in);
+ storedVal += diff;
+ return storedVal;
+ }
+
+ /** Read and reconstruct decimal component. */
+ private double readDecimal() throws IOException {
+ int count = in.readInt(BITS_FOR_DECIMAL_COUNT) + 1;
+ boolean hasXor = in.readBit();
+ long xor = 0;
+ if (hasXor) {
+ long bits = in.readLong(count);
+ xor = bits << (DOUBLE_MANTISSA_BITS - count);
+ }
+ long mVal = BitInputStream.readVarLong(in);
+ double frac;
+ if (hasXor) {
+ double base = (double) mVal / powers[count - 1] + 1;
+ long merged = xor ^ Double.doubleToLongBits(base);
+ frac = Double.longBitsToDouble(merged) - 1;
+ } else {
+ frac = (double) mVal / powers[count - 1];
+ }
+ // Round to original scale
+ scale = Math.pow(10, count);
+ return Math.round(frac * scale);
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
index 36c3d826..e0a96016 100644
--- a/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
+++ b/java/tsfile/src/main/java/org/apache/tsfile/encoding/decoder/Decoder.java
@@ -177,6 +177,13 @@ public abstract class Decoder {
default:
throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
}
+ case CAMEL:
+ switch (dataType) {
+ case DOUBLE:
+ return new CamelDecoder();
+ default:
+ throw new TsFileDecodingException(String.format(ERROR_MSG,
encoding, dataType));
+ }
default:
throw new TsFileDecodingException(String.format(ERROR_MSG, encoding,
dataType));
}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/CamelEncoder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/CamelEncoder.java
new file mode 100644
index 00000000..9156963b
--- /dev/null
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/CamelEncoder.java
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.encoder;
+
+import org.apache.tsfile.common.bitStream.BitOutputStream;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.utils.ReadWriteForEncodingUtils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+
+public class CamelEncoder extends Encoder {
+ private final GorillaEncoder gorillaEncoder;
+
+ // === Constants for encoding ===
+ private static final int BITS_FOR_SIGN = 1;
+ private static final int BITS_FOR_TYPE = 1;
+ private static final int BITS_FOR_FIRST_VALUE = 64;
+ private static final int BITS_FOR_LEADING_ZEROS = 6;
+ private static final int BITS_FOR_SIGNIFICANT_BITS = 6;
+ private static final int BITS_FOR_DECIMAL_COUNT = 4;
+ private static final int DOUBLE_TOTAL_BITS = 64;
+ private static final int DOUBLE_MANTISSA_BITS = 52;
+ private static final int DECIMAL_MAX_COUNT = 10;
+
+ // === Camel state ===
+ private long storedVal = 0;
+ private boolean isFirst = true;
+ long previousValue = 0;
+ private boolean hasPending = false; // guard for empty or duplicate flush
+
+ // === Precomputed tables ===
+ public static final long[] powers = new long[DECIMAL_MAX_COUNT];
+ public static final long[] threshold = new long[DECIMAL_MAX_COUNT];
+
+ private final BitOutputStream out;
+ private final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+ static {
+ for (int l = 1; l <= DECIMAL_MAX_COUNT; l++) {
+ int idx = l - 1;
+ powers[idx] = (long) Math.pow(10, l);
+ long divisor = 1L << l;
+ threshold[idx] = powers[idx] / divisor;
+ }
+ }
+
+ public CamelEncoder() {
+ super(TSEncoding.CAMEL);
+ out = new BitOutputStream(baos);
+ gorillaEncoder = new GorillaEncoder();
+ }
+
+ /**
+ * Encode a double value and buffer bits for later flush. Marks that there
is pending data to
+ * flush.
+ *
+ * @param value the double value to encode
+ * @param out unused here, uses internal buffer
+ */
+ @Override
+ public void encode(double value, ByteArrayOutputStream out) {
+ try {
+ this.addValue(value);
+ hasPending = true;
+ } catch (IOException ignored) {
+ }
+ }
+
+ /**
+ * Flush buffered encoded values to the provided stream. Writes a header
indicating the number of
+ * bits written, followed by the buffered bit data. Resets internal buffers
and state afterward.
+ * Consecutive calls without new data are no-ops.
+ *
+ * @param out the destination ByteArrayOutputStream to write flushed data
+ * @throws IOException if an I/O error occurs during flush
+ */
+ @Override
+ public void flush(ByteArrayOutputStream out) throws IOException {
+ if (!hasPending) {
+ return;
+ }
+ int writtenBits = close();
+ ReadWriteForEncodingUtils.writeVarInt(writtenBits, out);
+ this.baos.writeTo(out);
+ this.baos.reset();
+ this.out.reset(this.baos);
+ resetState();
+ hasPending = false;
+ }
+
+ /**
+ * Reset encoder state to initial conditions for a new block. Clears Camel
and nested Gorilla
+ * state, and resets pending flag.
+ */
+ private void resetState() {
+ this.isFirst = true;
+ this.storedVal = 0L;
+ this.previousValue = 0L;
+ this.hasPending = false;
+ // reset Gorilla state
+ this.gorillaEncoder.leadingZeros = Integer.MAX_VALUE;
+ this.gorillaEncoder.trailingZeros = 0;
+ }
+
+ @Override
+ public int getOneItemMaxSize() {
+ return 8;
+ }
+
+ @Override
+ public long getMaxByteSize() {
+ // bitstream buffer | bytes buffer | storedVal | previousValue
+ return 1 + this.baos.size() + 8 + 8;
+ }
+
+ public class GorillaEncoder {
+ private int leadingZeros = Integer.MAX_VALUE;
+ private int trailingZeros = 0;
+
+ public void encode(double value, BitOutputStream out) throws IOException {
+ long curr = Double.doubleToLongBits(value);
+ if (isFirst) {
+ out.writeLong(curr, BITS_FOR_FIRST_VALUE);
+ previousValue = curr;
+ isFirst = false;
+ return;
+ }
+
+ long xor = curr ^ previousValue;
+ if (xor == 0) {
+ out.writeBit(false); // Control bit: same as previous
+ } else {
+ out.writeBit(true); // Control bit: value changed
+ int leading = Long.numberOfLeadingZeros(xor);
+ int trailing = Long.numberOfTrailingZeros(xor);
+ if (leading >= leadingZeros && trailing >= trailingZeros) {
+ out.writeBit(false); // Reuse previous block info
+ int significantBits = DOUBLE_TOTAL_BITS - leadingZeros -
trailingZeros;
+ out.writeLong(xor >>> trailingZeros, significantBits);
+ } else {
+ out.writeBit(true); // Write new block info
+ out.writeInt(leading, BITS_FOR_LEADING_ZEROS);
+ int significantBits = DOUBLE_TOTAL_BITS - leading - trailing;
+ out.writeInt(significantBits - 1, BITS_FOR_SIGNIFICANT_BITS);
+ out.writeLong(xor >>> trailing, significantBits);
+ leadingZeros = leading;
+ trailingZeros = trailing;
+ }
+ }
+
+ previousValue = curr;
+ }
+
+ public void close(BitOutputStream out) throws IOException {
+ out.close();
+ }
+ }
+
+ public void addValue(double value) throws IOException {
+ if (isFirst) {
+ writeFirst(Double.doubleToRawLongBits(value));
+ } else {
+ compressValue(value);
+ }
+ previousValue = Double.doubleToLongBits(value);
+ }
+
+ private void writeFirst(long value) throws IOException {
+ isFirst = false;
+ storedVal = (long) Double.longBitsToDouble(value);
+ out.writeLong(value, BITS_FOR_FIRST_VALUE);
+ }
+
+ public int close() throws IOException {
+ out.close();
+ return out.getBitsWritten();
+ }
+
+ private void compressValue(double value) throws IOException {
+ int signBit = (int) ((Double.doubleToLongBits(value) >>>
(DOUBLE_TOTAL_BITS - 1)) & 1);
+ out.writeInt(signBit, BITS_FOR_SIGN);
+
+ value = Math.abs(value);
+ if (value > Long.MAX_VALUE
+ || value == 0
+ || Math.abs(Math.floor(Math.log10(value))) > DECIMAL_MAX_COUNT) {
+ out.writeInt(CamelInnerEncodingType.GORILLA.getCode(), BITS_FOR_TYPE);
+ gorillaEncoder.encode(value, out);
+ return;
+ }
+
+ long integerPart = (long) value;
+ int numDigits = 1;
+ long absInt = Math.abs(integerPart);
+ while (absInt >= 10) {
+ absInt /= 10;
+ numDigits++;
+ }
+
+ double factor = 1;
+ int decimalCount = 0;
+ while (Math.abs(value * factor - Math.round(value * factor)) > 0) {
+ factor *= 10.0;
+ decimalCount++;
+ if (numDigits + decimalCount > DECIMAL_MAX_COUNT) {
+ break;
+ }
+ }
+
+ decimalCount = Math.max(1, decimalCount);
+ long decimalValue;
+
+ if (decimalCount + numDigits <= DECIMAL_MAX_COUNT) {
+ long pow = powers[decimalCount - 1];
+ decimalValue = Math.round(value * pow) % pow;
+
+ out.writeInt(CamelInnerEncodingType.CAMEL.getCode(), BITS_FOR_TYPE);
+ compressIntegerValue(integerPart);
+ compressDecimalValue(decimalValue, decimalCount);
+ } else {
+ out.writeInt(CamelInnerEncodingType.GORILLA.getCode(), BITS_FOR_TYPE);
+ gorillaEncoder.encode(value, out);
+ }
+ }
+
+ private void compressIntegerValue(long value) throws IOException {
+ long diff = value - storedVal;
+ storedVal = value;
+ BitOutputStream.writeVarLong(diff, out);
+ }
+
+ private void compressDecimalValue(long decimalValue, int decimalCount)
throws IOException {
+ out.writeInt(decimalCount - 1, BITS_FOR_DECIMAL_COUNT);
+ long thresh = threshold[decimalCount - 1];
+ int m = (int) decimalValue;
+
+ if (decimalValue >= thresh) {
+ out.writeBit(true);
+ m = (int) (decimalValue % thresh);
+
+ long xor =
+ Double.doubleToLongBits((double) decimalValue / powers[decimalCount
- 1] + 1)
+ ^ Double.doubleToLongBits((double) m / powers[decimalCount - 1]
+ 1);
+
+ out.writeLong(xor >>> (DOUBLE_MANTISSA_BITS - decimalCount),
decimalCount);
+ } else {
+ out.writeBit(false);
+ }
+
+ BitOutputStream.writeVarLong(m, out);
+ }
+
+ public int getWrittenBits() {
+ return out.getBitsWritten();
+ }
+
+ public ByteArrayOutputStream getByteArrayOutputStream() {
+ return this.baos;
+ }
+
+ public GorillaEncoder getGorillaEncoder() {
+ return gorillaEncoder;
+ }
+
+ public enum CamelInnerEncodingType {
+ GORILLA(0),
+ CAMEL(1);
+
+ private final int code;
+
+ CamelInnerEncodingType(int code) {
+ this.code = code;
+ }
+
+ public int getCode() {
+ return code;
+ }
+ }
+}
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
index 68c7e56b..7849607c 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/encoding/encoder/TSEncodingBuilder.java
@@ -78,6 +78,8 @@ public abstract class TSEncodingBuilder {
return new Sprintz();
case RLBE:
return new RLBE();
+ case CAMEL:
+ return new Camel();
default:
throw new UnsupportedOperationException(type.toString());
}
@@ -265,6 +267,25 @@ public abstract class TSEncodingBuilder {
}
}
+ /** for DOUBLE. */
+ public static class Camel extends TSEncodingBuilder {
+
+ @Override
+ public Encoder getEncoder(TSDataType type) {
+ switch (type) {
+ case DOUBLE:
+ return new CamelEncoder();
+ default:
+ throw new UnSupportedDataTypeException("GORILLA_V1 doesn't support
data type: " + type);
+ }
+ }
+
+ @Override
+ public void initFromProps(Map<String, String> props) {
+ // allowed do nothing
+ }
+ }
+
/** for INT32, INT64. */
public static class Regular extends TSEncodingBuilder {
diff --git
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
index 20374376..d2ea1931 100644
---
a/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
+++
b/java/tsfile/src/main/java/org/apache/tsfile/file/metadata/enums/TSEncoding.java
@@ -41,7 +41,8 @@ public enum TSEncoding {
FREQ((byte) 10),
CHIMP((byte) 11),
SPRINTZ((byte) 12),
- RLBE((byte) 13);
+ RLBE((byte) 13),
+ CAMEL((byte) 14);
private final byte type;
@SuppressWarnings("java:S2386") // used by other projects
@@ -78,6 +79,7 @@ public enum TSEncoding {
floatSet.add(TSEncoding.CHIMP);
floatSet.add(TSEncoding.SPRINTZ);
floatSet.add(TSEncoding.RLBE);
+ floatSet.add(TSEncoding.CAMEL);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.FLOAT, floatSet);
TYPE_SUPPORTED_ENCODINGS.put(TSDataType.DOUBLE, floatSet);
@@ -135,6 +137,8 @@ public enum TSEncoding {
return TSEncoding.SPRINTZ;
case 13:
return TSEncoding.RLBE;
+ case 14:
+ return TSEncoding.CAMEL;
default:
throw new IllegalArgumentException("Invalid input: " + encoding);
}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/common/bitStream/TestBitStream.java
b/java/tsfile/src/test/java/org/apache/tsfile/common/bitStream/TestBitStream.java
new file mode 100644
index 00000000..447f582f
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/common/bitStream/TestBitStream.java
@@ -0,0 +1,346 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.common.bitStream;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import static org.apache.tsfile.common.bitStream.BitInputStream.readVarLong;
+import static org.apache.tsfile.common.bitStream.BitOutputStream.writeVarInt;
+import static org.apache.tsfile.common.bitStream.BitOutputStream.writeVarLong;
+
+public class TestBitStream {
+
+ @Test
+ public void testWriteAndReadInt() throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+
+ out.writeInt(0, 0); // No-op write
+ out.writeInt(0x78563412, 32); // Full int
+ out.writeInt(2, 4); // Partial int
+ out.writeInt(3, 3);
+ out.writeInt(0, 1);
+ out.writeInt(0xA8, 8); // One byte
+ out.writeInt(0x11, 6); // 6 bits
+ out.close();
+
+ byte[] expected = new byte[] {0x78, 0x56, 0x34, 0x12, 0x26, (byte) 0xA8,
0x44};
+ Assert.assertArrayEquals(expected, bout.toByteArray());
+ }
+
+ @Test
+ public void testBitInputWithMarkAndEOF() throws IOException {
+ byte[] data = new byte[] {0x12, 0x34, 0x56, 0x78, 0x32, (byte) 0xA8, 0x11};
+ BitInputStream in = new BitInputStream(new ByteArrayInputStream(data),
data.length * 8);
+
+ Assert.assertTrue(in.markSupported());
+ Assert.assertEquals(56, in.availableBits());
+ Assert.assertEquals(0, in.readInt(0));
+ Assert.assertEquals(0x12345678, in.readInt(32));
+ Assert.assertEquals(3, in.readInt(4));
+
+ in.mark(200);
+ Assert.assertEquals(1, in.readInt(3));
+ Assert.assertEquals(0, in.readInt(1));
+ Assert.assertEquals(0xA8, in.readInt(8));
+
+ in.reset();
+ Assert.assertEquals(2, in.readInt(4));
+ Assert.assertEquals(0xA8, in.readInt(8));
+
+ Assert.assertEquals(8, in.availableBits());
+ Assert.assertEquals(0x110, in.readInt(12));
+
+ try {
+ in.readInt(1);
+ Assert.fail("Expected EOFException");
+ } catch (EOFException ignored) {
+ }
+
+ in.close();
+ }
+
+ @Test
+ public void testWriteAndReadLong() throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+
+ long[] values = {0L, 1L, 0xFFFFFFFFL, 0x123456789ABCDEFL, Long.MAX_VALUE,
Long.MIN_VALUE};
+ int[] bits = {1, 2, 32, 60, 64, 64};
+
+ for (int i = 0; i < values.length; i++) {
+ out.writeLong(values[i], bits[i]);
+ }
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ for (int i = 0; i < values.length; i++) {
+ long actual = in.readLong(bits[i]);
+ Assert.assertEquals("Mismatch at index " + i, values[i], actual);
+ }
+ in.close();
+ }
+
+ @Test
+ public void testWriteAndReadBits() throws IOException {
+ boolean[] bits = {true, false, true, true, false, false, false, true,
false, true};
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ for (boolean b : bits) {
+ out.writeBit(b);
+ }
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ for (int i = 0; i < bits.length; i++) {
+ boolean actual = in.readBit();
+ Assert.assertEquals("Bit mismatch at index " + i, bits[i], actual);
+ }
+
+ try {
+ in.readBit();
+ Assert.fail("Expected EOFException");
+ } catch (EOFException ignored) {
+ }
+
+ in.close();
+ }
+
+ @Test
+ public void testLongBitWidths() throws IOException {
+ for (int bits = 1; bits <= 64; bits++) {
+ long value = (1L << (bits - 1)) | 1L;
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ out.writeLong(value, bits);
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ long result = in.readLong(bits);
+ Assert.assertEquals("Failed at bit width = " + bits, value, result);
+ in.close();
+ }
+ }
+
+ @Test
+ public void testAllZerosAndAllOnesLong() throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+
+ out.writeLong(0L, 64);
+ out.writeLong(-1L, 64);
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ Assert.assertEquals(0L, in.readLong(64));
+ Assert.assertEquals(-1L, in.readLong(64));
+ in.close();
+ }
+
+ @Test
+ public void testBitBoundaryCrossing() throws IOException {
+ boolean[] bits = {
+ false,
+ true,
+ true,
+ false,
+ true,
+ false,
+ false,
+ true, // first byte
+ true,
+ true,
+ false,
+ true,
+ false,
+ true,
+ true // crosses byte
+ };
+
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ for (boolean b : bits) {
+ out.writeBit(b);
+ }
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ for (int i = 0; i < bits.length; i++) {
+ Assert.assertEquals("Mismatch at bit index " + i, bits[i], in.readBit());
+ }
+ in.close();
+ }
+
+ @Test
+ public void testMixedLongAndBit() throws IOException {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+
+ out.writeLong(0x1FL, 5); // 11111
+ out.writeBit(true); // 1
+ out.writeBit(false); // 0
+ out.writeBit(true); // 1
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+
+ Assert.assertEquals(0x1F, in.readLong(5));
+ Assert.assertTrue(in.readBit());
+ Assert.assertFalse(in.readBit());
+ Assert.assertTrue(in.readBit());
+
+ in.close();
+ }
+
+ @Test
+ public void testVarLongSymmetry() throws IOException {
+ long[] testValues = {
+ 0, 1, -1, 63, -63, 64, -64, 128, -128, 1024, -1024, Long.MAX_VALUE,
Long.MIN_VALUE
+ };
+
+ for (long original : testValues) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ writeVarLong(original, out);
+ out.close();
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ long decoded = readVarLong(in);
+ in.close();
+
+ Assert.assertEquals("Mismatch for value: " + original, original,
decoded);
+ }
+ }
+
+ @Test
+ public void testVarLongContinuousRange() throws IOException {
+ for (int value = -10000; value <= 10000; value++) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ writeVarLong(value, out);
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ long decoded = readVarLong(in);
+ in.close();
+
+ Assert.assertEquals("Mismatch in range test for: " + value, value,
decoded);
+ }
+ }
+
+ @Test
+ public void testVarLongBitLengthGrowth() throws IOException {
+ long[] values = {0, 1, 2, 64, 128, 8192, 1 << 20, Long.MAX_VALUE / 2};
+ int lastBits = 0;
+
+ for (long value : values) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ int bits = writeVarLong(value, out);
+ out.close();
+
+ Assert.assertTrue("Bit length didn't increase for " + value, bits >=
lastBits);
+ lastBits = bits;
+ }
+ }
+
+ @Test
+ public void testVarIntSymmetry() throws IOException {
+ int[] values = {
+ 0,
+ 1,
+ -1,
+ 63,
+ -63,
+ 127,
+ -128,
+ 255,
+ -256,
+ 1023,
+ -1023,
+ 16384,
+ -16384,
+ Integer.MAX_VALUE,
+ Integer.MIN_VALUE
+ };
+
+ for (int value : values) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ writeVarInt(value, out);
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ int decoded = BitInputStream.readVarInt(in);
+ in.close();
+
+ Assert.assertEquals("Mismatch for value: " + value, value, decoded);
+ }
+ }
+
+ @Test
+ public void testVarIntRange() throws IOException {
+ for (int value = -10000; value <= 10000; value++) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ writeVarInt(value, out);
+ out.close();
+
+ BitInputStream in =
+ new BitInputStream(new ByteArrayInputStream(bout.toByteArray()),
out.getBitsWritten());
+ int decoded = in.readVarInt(in);
+ in.close();
+
+ Assert.assertEquals("Mismatch in range for value: " + value, value,
decoded);
+ }
+ }
+
+ @Test
+ public void testBitLengthGrowth() throws IOException {
+ int[] values = {0, 1, 2, 64, 128, 1024, 16384, 1 << 20};
+ int lastBits = 0;
+
+ for (int value : values) {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+ int bits = writeVarInt(value, out);
+ out.close();
+
+ Assert.assertTrue("Bit length not increasing for value: " + value, bits
>= lastBits);
+ lastBits = bits;
+ }
+ }
+}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/CamelDecoderTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/CamelDecoderTest.java
new file mode 100644
index 00000000..cb72c19b
--- /dev/null
+++
b/java/tsfile/src/test/java/org/apache/tsfile/encoding/decoder/CamelDecoderTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tsfile.encoding.decoder;
+
+import org.apache.tsfile.common.bitStream.BitInputStream;
+import org.apache.tsfile.common.bitStream.BitOutputStream;
+import org.apache.tsfile.encoding.encoder.CamelEncoder;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Random;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class CamelDecoderTest {
+
+ @Test
+ public void testRandomizedCompressDecompress() throws Exception {
+ Random random = new Random();
+ int sampleSize = 10_000;
+ double[] original = new double[sampleSize];
+
+ // Generate random test data (excluding NaN and ±Infinity)
+ for (int i = 0; i < sampleSize; i++) {
+ double v;
+ do {
+ long bits = random.nextLong();
+ v = Double.longBitsToDouble(bits);
+ } while (Double.isNaN(v) || Double.isInfinite(v));
+ original[i] = v;
+ }
+
+ compressDecompressAndAssert(original, 0);
+ }
+
+ private void compressDecompressAndAssert(double[] original, double
tolerance) throws Exception {
+ CamelEncoder encoder = new CamelEncoder();
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ for (double v : original) {
+ encoder.encode(v, bout);
+ }
+ encoder.flush(bout);
+ // Decode and verify
+ CamelDecoder decoder = new CamelDecoder();
+ ByteBuffer buffer = ByteBuffer.wrap(bout.toByteArray());
+
+ int i = 0;
+ while (decoder.hasNext(buffer)) {
+ double actual = decoder.readDouble(buffer);
+ double expected = original[i];
+ if (Double.isNaN(expected)) {
+ assertTrue("Expected NaN at index " + i, Double.isNaN(actual));
+ } else {
+ assertEquals("Mismatch at index " + i, expected, actual, tolerance);
+ }
+ i++;
+ }
+ assertEquals(original.length, i);
+ }
+
+ @Test
+ public void testSpecialFloatingValues() throws Exception {
+ double[] original =
+ new double[] {
+ Double.NaN,
+ Double.POSITIVE_INFINITY,
+ Double.NEGATIVE_INFINITY,
+ +0.0,
+ -0.0,
+ Double.MIN_VALUE,
+ -Double.MIN_VALUE,
+ Double.MIN_NORMAL,
+ -Double.MIN_NORMAL,
+ Double.MAX_VALUE,
+ -Double.MAX_VALUE
+ };
+ compressDecompressAndAssert(original, 0.0);
+ }
+
+ @Test
+ public void testMonotonicSequence() throws Exception {
+ double[] increasing = new double[500];
+ double[] decreasing = new double[500];
+ for (int i = 0; i < 500; i++) {
+ increasing[i] = 100.0 + i * 0.0001;
+ decreasing[i] = 100.0 - i * 0.0001;
+ }
+ compressDecompressAndAssert(increasing, 0);
+ compressDecompressAndAssert(decreasing, 0);
+ }
+
+ @Test
+ public void testPrecisionEdgeCases() throws Exception {
+ double[] original = {
+ 9007199254740991.0, // 2^53 - 1
+ 9007199254740992.0, // 2^53
+ 9007199254740993.0,
+ 1.0000000000000001, // Precision loss (equals 1.0)
+ 1.0000000000000002,
+ 12345,
+ 21332213
+ };
+ compressDecompressAndAssert(original, 0.0);
+ }
+
+ @Test
+ public void testAlternatingSignsAndDecimals() throws Exception {
+ double[] original = new double[2];
+ for (int i = 0; i < 2; i++) {
+ double base = i * 0.123456 % 1000;
+ original[i] = (i % 2 == 0) ? base : -base;
+ }
+ compressDecompressAndAssert(original, 0.0);
+ }
+
+ @Test
+ public void testMinimalDeltaSequence() throws Exception {
+ double[] original = new double[64];
+ double base = 100.0;
+ for (int i = 0; i < original.length; i++) {
+ original[i] = base + i * Math.ulp(base);
+ }
+ compressDecompressAndAssert(original, 0.0);
+ }
+
+ @Test
+ public void testRepeatedValues() throws Exception {
+ double repeated = 123.456789;
+ double[] original = new double[1000];
+ Arrays.fill(original, repeated);
+ compressDecompressAndAssert(original, 0.0);
+ }
+
+ private void testGorillaValues(double[] values) throws Exception {
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ BitOutputStream out = new BitOutputStream(bout);
+
+ CamelEncoder.GorillaEncoder encoder = new
CamelEncoder().getGorillaEncoder();
+ for (double v : values) {
+ encoder.encode(v, out);
+ }
+ encoder.close(out);
+
+ byte[] encoded = bout.toByteArray();
+ BitInputStream in = new BitInputStream(new ByteArrayInputStream(encoded),
out.getBitsWritten());
+ InputStream inputStream = new ByteArrayInputStream(encoded);
+ CamelDecoder.GorillaDecoder decoder =
+ new CamelDecoder(inputStream,
out.getBitsWritten()).getGorillaDecoder();
+
+ int idx = 0;
+ for (double expected : values) {
+ double actual = decoder.decode(in);
+ Assert.assertEquals("Mismatch decoding: ", expected, actual, 0.0);
+ }
+ }
+
+ @Test
+ public void testGorillaAllZeros() throws Exception {
+ double[] values = new double[100];
+ Arrays.fill(values, 0.0);
+ testGorillaValues(values);
+ }
+
+ @Test
+ public void testGorillaConstantValue() throws Exception {
+ double[] values = new double[200];
+ Arrays.fill(values, 123456.789);
+ testGorillaValues(values);
+ }
+
+ @Test
+ public void testGorillaMinMaxValues() throws Exception {
+ double[] values = {
+ Double.MIN_VALUE, Double.MAX_VALUE, -Double.MAX_VALUE,
-Double.MIN_VALUE, 0.0, -0.0
+ };
+ testGorillaValues(values);
+ }
+
+ @Test
+ public void testGorillaMixedSigns() throws Exception {
+ double[] values = {-1.1, 2.2, -3.3, 4.4, -5.5, 6.6, -7.7};
+ testGorillaValues(values);
+ }
+
+ @Test
+ public void testGorillaHighPrecisionValues() throws Exception {
+ double[] values = {0.1, 0.2, 0.3, 0.1 + 0.2, 0.4 - 0.1};
+ testGorillaValues(values);
+ }
+
+ @Test
+ public void testGorillaXorEdgeTrigger() throws Exception {
+ double[] values = {
+ 1.00000001,
+ 1.00000002,
+ 1.00000003,
+ 1.00000001, // back to earlier value
+ 1.00000009
+ };
+ testGorillaValues(values);
+ }
+
+ @Test
+ public void testLargeSeries() throws Exception {
+ double[] values = new double[1000];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = Math.sin(i / 10.0);
+ }
+ testGorillaValues(values);
+ }
+
+ private static final int[] FLUSH_SIZES = {32, 64, 128, 256, 512, 1000};
+ private static final int TOTAL_VALUES = 1_000_000;
+
+ @Test
+ public void testBatchFlushForVariousBlockSizes() throws IOException {
+ Random random = new Random(42);
+ for (int blockSize : FLUSH_SIZES) {
+ // Prepare encoder and output buffer
+ CamelEncoder encoder = new CamelEncoder();
+ ByteArrayOutputStream bout = new ByteArrayOutputStream();
+ double[] original = new double[TOTAL_VALUES];
+
+ // Generate random data and flush every blockSize values
+ for (int i = 0; i < TOTAL_VALUES; i++) {
+ double v;
+ do {
+ long bits = random.nextLong();
+ v = Double.longBitsToDouble(bits);
+ } while (Double.isNaN(v) || Double.isInfinite(v));
+ original[i] = v;
+ encoder.encode(v, bout);
+ if ((i + 1) % blockSize == 0) {
+ encoder.flush(bout);
+ }
+ }
+ // Final flush to cover trailing values
+ encoder.flush(bout);
+
+ // Decode and verify
+ CamelDecoder decoder = new CamelDecoder();
+ ByteBuffer buffer = ByteBuffer.wrap(bout.toByteArray());
+ for (int i = 0; i < TOTAL_VALUES; i++) {
+ Assert.assertTrue(
+ "Decoder should have next for blockSize=" + blockSize,
decoder.hasNext(buffer));
+ double decoded = decoder.readDouble(buffer);
+
+ Assert.assertEquals(
+ "Mismatch at index " + i + " for blockSize=" + blockSize,
original[i], decoded, 0);
+ }
+ Assert.assertFalse(
+ "Decoder should be exhausted after reading all values for
blockSize=" + blockSize,
+ decoder.hasNext(buffer));
+ }
+ }
+}
diff --git
a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileReadWriteTest.java
b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileReadWriteTest.java
index 7f74d162..3d91fffd 100644
--- a/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileReadWriteTest.java
+++ b/java/tsfile/src/test/java/org/apache/tsfile/write/TsFileReadWriteTest.java
@@ -151,7 +151,8 @@ public class TsFileReadWriteTest {
TSEncoding.RLE,
TSEncoding.TS_2DIFF,
TSEncoding.GORILLA_V1,
- TSEncoding.GORILLA);
+ TSEncoding.GORILLA,
+ TSEncoding.CAMEL);
for (TSEncoding encoding : encodings) {
doubleTest(encoding);
}