This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new f1dad367 Feature/cpp codec sprintz (#553)
f1dad367 is described below
commit f1dad36730f14cf0f7ae4b40274b18ab990ea9ad
Author: Hongzhi Gao <[email protected]>
AuthorDate: Wed Jul 23 19:12:06 2025 +0800
Feature/cpp codec sprintz (#553)
* implement sprintz codec
* adjusted decoder interface and made it consistence with java edition
* implements sprintz codec
* integrated the Sprintz codec
* add license
* fix some issues
---
cpp/src/common/allocator/byte_stream.h | 72 ++++
cpp/src/common/db_common.h | 1 +
cpp/src/common/global.cc | 2 +-
cpp/src/cwrapper/tsfile_cwrapper.h | 1 +
cpp/src/encoding/decoder.h | 2 +-
cpp/src/encoding/decoder_factory.h | 18 +
cpp/src/encoding/dictionary_decoder.h | 5 +-
cpp/src/encoding/double_sprintz_decoder.h | 223 ++++++++++++
cpp/src/encoding/double_sprintz_encoder.h | 175 ++++++++++
cpp/src/encoding/encode_utils.h | 18 +
cpp/src/encoding/encoder_factory.h | 18 +
cpp/src/encoding/fire.h | 106 ++++++
cpp/src/encoding/float_sprintz_decoder.h | 236 +++++++++++++
cpp/src/encoding/float_sprintz_encoder.h | 174 +++++++++
cpp/src/encoding/gorilla_decoder.h | 4 +-
cpp/src/encoding/int32_rle_decoder.h | 4 +-
cpp/src/encoding/int32_sprintz_decoder.h | 195 +++++++++++
cpp/src/encoding/int32_sprintz_encoder.h | 190 ++++++++++
cpp/src/encoding/int64_rle_decoder.h | 4 +-
cpp/src/encoding/int64_sprintz_decoder.h | 196 +++++++++++
cpp/src/encoding/int64_sprintz_encoder.h | 190 ++++++++++
cpp/src/encoding/plain_decoder.h | 4 +-
cpp/src/encoding/sprintz_decoder.h | 70 ++++
cpp/src/encoding/sprintz_encoder.h | 69 ++++
cpp/src/encoding/ts2diff_decoder.h | 3 +-
cpp/src/encoding/zigzag_decoder.h | 4 +-
cpp/src/reader/aligned_chunk_reader.cc | 13 +-
cpp/src/reader/aligned_chunk_reader.h | 4 +-
cpp/src/reader/chunk_reader.cc | 13 +-
cpp/src/reader/chunk_reader.h | 2 +-
cpp/test/common/allocator/byte_stream_test.cc | 32 ++
cpp/test/encoding/sprintz_codec_test.cc | 387 +++++++++++++++++++++
.../writer/table_view/tsfile_writer_table_test.cc | 16 +-
33 files changed, 2420 insertions(+), 31 deletions(-)
diff --git a/cpp/src/common/allocator/byte_stream.h
b/cpp/src/common/allocator/byte_stream.h
index 6984ecff..65c4b80a 100644
--- a/cpp/src/common/allocator/byte_stream.h
+++ b/cpp/src/common/allocator/byte_stream.h
@@ -824,6 +824,78 @@ class SerializationUtil {
ui64 = (ui64 << 8) | (buf[7] & 0xFF);
return ret;
}
+
+ FORCE_INLINE static int write_int_little_endian_padded_on_bit_width(
+ int32_t value, ByteStream &out, int bitWidth) {
+ int paddedByteNum = (bitWidth + 7) / 8;
+ if (paddedByteNum > 4) {
+ return E_TSFILE_CORRUPTED;
+ }
+ auto u = static_cast<uint32_t>(value);
+ for (int i = 0; i < paddedByteNum; ++i) {
+ uint8_t byte = (u >> (i * 8)) & 0xFF;
+ out.write_buf(&byte, 1);
+ }
+ return E_OK;
+ }
+
+ FORCE_INLINE static int write_int64_little_endian_padded_on_bit_width(
+ int64_t value, ByteStream &out, int bit_width) {
+ int padded_byte_num = (bit_width + 7) / 8;
+ if (padded_byte_num > 8) {
+ return E_TSFILE_CORRUPTED;
+ }
+ auto u = static_cast<uint64_t>(value);
+ for (int i = 0; i < padded_byte_num; ++i) {
+ uint8_t byte = (u >> (i * 8)) & 0xFF;
+ out.write_buf(&byte, 1);
+ }
+ return E_OK;
+ }
+
+ FORCE_INLINE static int read_int_little_endian_padded_on_bit_width(
+ ByteStream &in, int bitWidth, int32_t &out_val) {
+ int padded_byte_num = (bitWidth + 7) / 8;
+ if (padded_byte_num > 4) {
+ return E_TSFILE_CORRUPTED;
+ }
+ uint8_t buf[4] = {0};
+ uint32_t read_len = 0;
+ int ret = in.read_buf(buf, padded_byte_num, read_len);
+ if (ret != E_OK || read_len != static_cast<uint32_t>(padded_byte_num))
{
+ return E_TSFILE_CORRUPTED;
+ }
+ uint32_t result = 0;
+ for (int i = 0; i < padded_byte_num; ++i) {
+ result |= static_cast<uint32_t>(buf[i]) << (i * 8);
+ }
+ out_val = static_cast<int32_t>(result);
+ return E_OK;
+ }
+
+ FORCE_INLINE static int chunk_read_all_data(ByteStream &in, ByteStream
&out,
+ size_t chunk_size = 4096) {
+ char *buffer = new char[chunk_size];
+ int ret = common::E_OK;
+ while (in.remaining_size() > 0) {
+ // Adjust read size based on remaining input size
+ uint32_t bytes_to_read = static_cast<uint32_t>(
+ std::min(chunk_size,
static_cast<size_t>(in.remaining_size())));
+
+ uint32_t bytes_read = 0;
+ ret = in.read_buf(buffer, bytes_to_read, bytes_read);
+ if (ret != E_OK || bytes_read == 0) {
+ break;
+ }
+ if (RET_FAIL(out.write_buf(buffer, bytes_read))) {
+ ret = common::E_ENCODE_ERR;
+ break;
+ }
+ }
+ delete[] buffer;
+ return ret;
+ }
+
// caller guarantee buffer has at least 1 byte
FORCE_INLINE static uint8_t read_ui8(char *buffer) {
return *(uint8_t *)buffer;
diff --git a/cpp/src/common/db_common.h b/cpp/src/common/db_common.h
index 8d6465c6..485a0c10 100644
--- a/cpp/src/common/db_common.h
+++ b/cpp/src/common/db_common.h
@@ -69,6 +69,7 @@ enum TSEncoding : uint8_t {
GORILLA = 8,
ZIGZAG = 9,
FREQ = 10,
+ SPRINTZ = 12,
INVALID_ENCODING = 255
};
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index e2004037..21b3dad7 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -102,7 +102,7 @@ const char* s_data_type_names[8] = {"BOOLEAN", "INT32",
"INT64", "FLOAT",
const char* s_encoding_names[12] = {
"PLAIN", "DICTIONARY", "RLE", "DIFF", "TS_2DIFF", "BITMAP",
- "GORILLA_V1", "REGULAR", "GORILLA", "ZIGZAG", "FREQ"};
+ "GORILLA_V1", "REGULAR", "GORILLA", "ZIGZAG", "FREQ", "SPRINTZ"};
const char* s_compression_names[8] = {
"UNCOMPRESSED", "SNAPPY", "GZIP", "LZO", "SDT", "PAA", "PLA", "LZ4",
diff --git a/cpp/src/cwrapper/tsfile_cwrapper.h
b/cpp/src/cwrapper/tsfile_cwrapper.h
index 30727539..1f651f5d 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.h
+++ b/cpp/src/cwrapper/tsfile_cwrapper.h
@@ -52,6 +52,7 @@ typedef enum {
TS_ENCODING_GORILLA = 8,
TS_ENCODING_ZIGZAG = 9,
TS_ENCODING_FREQ = 10,
+ TS_ENCODING_SPRINTZ = 12,
TS_ENCODING_INVALID = 255
} TSEncoding;
diff --git a/cpp/src/encoding/decoder.h b/cpp/src/encoding/decoder.h
index b792ccfe..4dc24781 100644
--- a/cpp/src/encoding/decoder.h
+++ b/cpp/src/encoding/decoder.h
@@ -29,7 +29,7 @@ class Decoder {
Decoder() {}
virtual ~Decoder() {}
virtual void reset() = 0;
- virtual bool has_remaining() = 0;
+ virtual bool has_remaining(const common::ByteStream &buffer) = 0;
virtual int read_boolean(bool &ret_value, common::ByteStream &in) = 0;
virtual int read_int32(int32_t &ret_value, common::ByteStream &in) = 0;
virtual int read_int64(int64_t &ret_value, common::ByteStream &in) = 0;
diff --git a/cpp/src/encoding/decoder_factory.h
b/cpp/src/encoding/decoder_factory.h
index a9e725aa..37819cea 100644
--- a/cpp/src/encoding/decoder_factory.h
+++ b/cpp/src/encoding/decoder_factory.h
@@ -22,9 +22,13 @@
#include "decoder.h"
#include "dictionary_decoder.h"
+#include "double_sprintz_decoder.h"
#include "encoding/int32_rle_decoder.h"
#include "encoding/int64_rle_decoder.h"
+#include "float_sprintz_decoder.h"
#include "gorilla_decoder.h"
+#include "int32_sprintz_decoder.h"
+#include "int64_sprintz_decoder.h"
#include "plain_decoder.h"
#include "ts2diff_decoder.h"
#include "zigzag_decoder.h"
@@ -126,6 +130,20 @@ class DecoderFactory {
return nullptr;
}
+ case SPRINTZ:
+ switch (data_type) {
+ case INT32:
+ ALLOC_AND_RETURN_DECODER(Int32SprintzDecoder);
+ case INT64:
+ ALLOC_AND_RETURN_DECODER(Int64SprintzDecoder);
+ case FLOAT:
+ ALLOC_AND_RETURN_DECODER(FloatSprintzDecoder);
+ case DOUBLE:
+ ALLOC_AND_RETURN_DECODER(DoubleSprintzDecoder);
+ default:
+ return nullptr;
+ }
+
default:
// Not supported encoding
return nullptr;
diff --git a/cpp/src/encoding/dictionary_decoder.h
b/cpp/src/encoding/dictionary_decoder.h
index a6ee4a51..46214c34 100644
--- a/cpp/src/encoding/dictionary_decoder.h
+++ b/cpp/src/encoding/dictionary_decoder.h
@@ -37,8 +37,9 @@ class DictionaryDecoder : public Decoder {
public:
~DictionaryDecoder() override = default;
- bool has_remaining() {
- return !entry_index_.empty() && value_decoder_.has_next_package();
+ bool has_remaining(const common::ByteStream &buffer) {
+ return (!entry_index_.empty() && value_decoder_.has_next_package()) ||
+ buffer.has_remaining();
}
int read_boolean(bool &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
diff --git a/cpp/src/encoding/double_sprintz_decoder.h
b/cpp/src/encoding/double_sprintz_decoder.h
new file mode 100644
index 00000000..7a3ab6db
--- /dev/null
+++ b/cpp/src/encoding/double_sprintz_decoder.h
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DOUBLE_SPRINTZ_DECODER_H
+#define DOUBLE_SPRINTZ_DECODER_H
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/fire.h"
+#include "gorilla_decoder.h"
+#include "int64_packer.h"
+#include "sprintz_decoder.h"
+
+namespace storage {
+
+class DoubleSprintzDecoder : public SprintzDecoder {
+ public:
+ DoubleSprintzDecoder() : fire_pred_(3), predict_scheme_("fire") {
+ SprintzDecoder::reset();
+ current_buffer_.resize(block_size_ + 1);
+ convert_buffer_.resize(block_size_);
+ pre_value_ = 0;
+ current_value_ = 0.0;
+ current_count_ = 0;
+ decode_size_ = 0;
+ is_block_read_ = false;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0.0);
+ std::fill(convert_buffer_.begin(), convert_buffer_.end(), 0);
+ fire_pred_.reset();
+ }
+
+ ~DoubleSprintzDecoder() override = default;
+
+ void set_predict_method(const std::string& method) {
+ predict_scheme_ = method;
+ }
+
+ int read_boolean(bool& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_double(double& ret_value, common::ByteStream& in) override {
+ int ret = common::E_OK;
+ if (!is_block_read_) {
+ if (RET_FAIL(decode_block(in))) {
+ return ret;
+ }
+ }
+ ret_value = current_buffer_[current_count_++];
+ if (current_count_ == decode_size_) {
+ is_block_read_ = false;
+ current_count_ = 0;
+ }
+ return ret;
+ }
+ int read_float(float& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_String(common::String& ret_value, common::PageArena& pa,
+ common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ void reset() override {
+ SprintzDecoder::reset();
+ pre_value_ = 0;
+ current_value_ = 0.0;
+ current_count_ = 0;
+ decode_size_ = 0;
+ is_block_read_ = false;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0.0);
+ std::fill(convert_buffer_.begin(), convert_buffer_.end(), 0);
+ fire_pred_.reset();
+ }
+
+ bool has_remaining(const common::ByteStream& input) override {
+ int min_length = sizeof(uint32_t) + 1;
+ return (is_block_read_ && current_count_ < decode_size_) ||
+ input.remaining_size() >= min_length;
+ }
+
+ protected:
+ int decode_block(common::ByteStream& input) override {
+ // read header bitWidth
+ int ret = common::E_OK;
+ uint8_t byte;
+ uint32_t bit_width = 0, read_len = 0;
+ ret = input.read_buf(&byte, 1, read_len);
+ if (ret != common::E_OK || read_len != 1) {
+ return common::E_DECODE_ERR;
+ }
+ bit_width |= static_cast<uint32_t>(byte);
+ bit_width_ = static_cast<int32_t>(bit_width);
+
+ if ((bit_width_ & (1 << 7)) != 0) {
+ decode_size_ = bit_width_ & ~(1 << 7);
+ DoubleGorillaDecoder decoder;
+ for (int i = 0; i < decode_size_; ++i) {
+ if (RET_FAIL(decoder.read_double(current_buffer_[i], input))) {
+ return ret;
+ }
+ }
+ } else {
+ decode_size_ = block_size_ + 1;
+ common::SerializationUtil::read_double(pre_value_, input);
+ current_buffer_[0] = pre_value_;
+ std::vector<uint8_t> pack_buf(bit_width_);
+ uint32_t read_len = 0;
+ input.read_buf(reinterpret_cast<char*>(pack_buf.data()),
bit_width_,
+ read_len);
+ packer_ = std::make_shared<Int64Packer>(bit_width_);
+ std::vector<int64_t> tmp_buffer(block_size_);
+ packer_->unpack_8values(pack_buf.data(), 0, tmp_buffer.data());
+ for (int i = 0; i < block_size_; ++i) {
+ convert_buffer_[i] = tmp_buffer[i];
+ }
+ ret = recalculate();
+ }
+ is_block_read_ = true;
+ return ret;
+ }
+
+ int recalculate() override {
+ int ret = common::E_OK;
+ for (int i = 0; i < block_size_; ++i) {
+ int64_t v = convert_buffer_[i];
+ convert_buffer_[i] = (v % 2 == 0) ? -v / 2 : (v + 1) / 2;
+ }
+
+ if (predict_scheme_ == "delta") {
+ uint64_t prev_bits;
+ std::memcpy(&prev_bits, ¤t_buffer_[0], sizeof(prev_bits));
+ int64_t corrected0 =
+ convert_buffer_[0] + static_cast<int64_t>(prev_bits);
+ convert_buffer_[0] = corrected0;
+ double d0;
+ std::memcpy(&d0, &corrected0, sizeof(corrected0));
+ current_buffer_[1] = d0;
+
+ for (int i = 1; i < block_size_; ++i) {
+ convert_buffer_[i] += convert_buffer_[i - 1];
+ int64_t bits = convert_buffer_[i];
+ double di;
+ std::memcpy(&di, &bits, sizeof(bits));
+ current_buffer_[i + 1] = di;
+ }
+
+ } else if (predict_scheme_ == "fire") {
+ fire_pred_.reset();
+ uint64_t prev_bits;
+ std::memcpy(&prev_bits, ¤t_buffer_[0], sizeof(prev_bits));
+ int64_t p = fire_pred_.predict(prev_bits);
+ int64_t e0 = convert_buffer_[0];
+ int64_t corrected0 = p + e0;
+ convert_buffer_[0] = corrected0;
+ double d0;
+ std::memcpy(&d0, &corrected0, sizeof(corrected0));
+ current_buffer_[1] = d0;
+ fire_pred_.train(prev_bits, corrected0, e0);
+
+ for (int i = 1; i < block_size_; ++i) {
+ uint64_t prev_bits_i;
+ std::memcpy(&prev_bits_i, ¤t_buffer_[i],
+ sizeof(prev_bits_i));
+ int64_t err = convert_buffer_[i];
+ int64_t pred = fire_pred_.predict(prev_bits_i);
+ int64_t corrected = pred + err;
+ convert_buffer_[i] = corrected;
+ double di;
+ std::memcpy(&di, &corrected, sizeof(corrected));
+ current_buffer_[i + 1] = di;
+ fire_pred_.train(prev_bits_i, corrected, err);
+ }
+
+ } else {
+ ret = common::E_DECODE_ERR;
+ }
+ return ret;
+ }
+
+ private:
+ double pre_value_;
+ double current_value_;
+ size_t current_count_;
+ int decode_size_;
+ bool is_block_read_ = false;
+
+ std::vector<double> current_buffer_;
+ std::vector<int64_t> convert_buffer_;
+ std::shared_ptr<Int64Packer> packer_;
+ LongFire fire_pred_;
+ std::string predict_scheme_;
+};
+
+} // namespace storage
+
+#endif // DOUBLE_SPRINTZ_DECODER_H
diff --git a/cpp/src/encoding/double_sprintz_encoder.h
b/cpp/src/encoding/double_sprintz_encoder.h
new file mode 100644
index 00000000..1571dcca
--- /dev/null
+++ b/cpp/src/encoding/double_sprintz_encoder.h
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef DOUBLE_SPRINTZ_ENCODER_H
+#define DOUBLE_SPRINTZ_ENCODER_H
+
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/fire.h"
+#include "encoding/int64_packer.h"
+#include "gorilla_encoder.h"
+#include "sprintz_encoder.h"
+
+namespace storage {
+
+class DoubleSprintzEncoder : public SprintzEncoder {
+ public:
+ DoubleSprintzEncoder() : fire_pred_(3) {
+ convert_buffer_.resize(block_size_);
+ }
+
+ ~DoubleSprintzEncoder() override = default;
+
+ void reset() override {
+ SprintzEncoder::reset();
+ values_.clear();
+ }
+
+ void destroy() override {}
+
+ int get_one_item_max_size() override {
+ return 1 + (1 + block_size_) * static_cast<int>(sizeof(int64_t));
+ }
+
+ int get_max_byte_size() override {
+ return 1 + (static_cast<int>(values_.size()) + 1) *
+ static_cast<int>(sizeof(int64_t));
+ }
+
+ int encode(bool, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(int32_t, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(int64_t, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(float, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(double value, common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (!is_first_cached_) {
+ values_.push_back(value);
+ is_first_cached_ = true;
+ return ret;
+ }
+ values_.push_back(value);
+
+ if (values_.size() == block_size_ + 1) {
+ fire_pred_.reset();
+ for (int i = 1; i <= block_size_; ++i) {
+ convert_buffer_[i - 1] = predict(values_[i], values_[i - 1]);
+ }
+ bit_pack();
+ is_first_cached_ = false;
+ values_.clear();
+ group_num_++;
+ if (group_num_ == group_max_) {
+ if (RET_FAIL(flush(out_stream))) return ret;
+ }
+ }
+ return ret;
+ }
+ int encode(const common::String, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int flush(common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (byte_cache_.total_size() > 0) {
+ if (RET_FAIL(common::SerializationUtil::chunk_read_all_data(
+ byte_cache_, out_stream))) {
+ return ret;
+ }
+ }
+
+ if (!values_.empty()) {
+ int size = static_cast<int>(values_.size());
+ size |= (1 << 7);
+ common::SerializationUtil::
+ write_int_little_endian_padded_on_bit_width(size, out_stream,
+ 1);
+ DoubleGorillaEncoder encoder;
+ for (double val : values_) {
+ encoder.encode(val, out_stream);
+ }
+ encoder.flush(out_stream);
+ }
+
+ reset();
+ return ret;
+ }
+
+ protected:
+ void bit_pack() override {
+ // extract and remove first value
+ double pre_value = values_[0];
+ values_.erase(values_.begin());
+
+ // compute bit width and init packer
+ bit_width_ = get_int64_max_bit_width(convert_buffer_);
+ packer_ = std::make_shared<Int64Packer>(bit_width_);
+
+ std::vector<uint8_t> bytes(bit_width_);
+ packer_->pack_8values(convert_buffer_.data(), 0, bytes.data());
+
+ // write bit_width and first value
+ common::SerializationUtil::write_int_little_endian_padded_on_bit_width(
+ bit_width_, byte_cache_, 1);
+ uint8_t buf[8];
+ common::double_to_bytes(pre_value, buf);
+ byte_cache_.write_buf(reinterpret_cast<const char*>(buf), 8);
+ byte_cache_.write_buf(reinterpret_cast<const char*>(bytes.data()),
+ bytes.size());
+ }
+
+ int64_t predict(double value, double prev) {
+ int64_t curr_bits = common::double_to_long(value);
+ int64_t prev_bits = common::double_to_long(prev);
+ int64_t raw_pred;
+ if (predict_method_ == "delta") {
+ raw_pred = curr_bits - prev_bits;
+ } else if (predict_method_ == "fire") {
+ int64_t pred = fire_pred_.predict(prev_bits);
+ int64_t err = curr_bits - pred;
+ fire_pred_.train(prev_bits, curr_bits, err);
+ raw_pred = err;
+ } else {
+ ASSERT(false);
+ }
+ return (raw_pred <= 0) ? -2 * raw_pred : 2 * raw_pred - 1;
+ }
+
+ private:
+ std::vector<double> values_;
+ std::vector<int64_t> convert_buffer_;
+ std::shared_ptr<Int64Packer> packer_;
+ LongFire fire_pred_;
+};
+
+} // namespace storage
+
+#endif // DOUBLE_SPRINTZ_ENCODER_H
diff --git a/cpp/src/encoding/encode_utils.h b/cpp/src/encoding/encode_utils.h
index 7cbd2afc..a8be851c 100644
--- a/cpp/src/encoding/encode_utils.h
+++ b/cpp/src/encoding/encode_utils.h
@@ -81,6 +81,15 @@ FORCE_INLINE int32_t number_of_trailing_zeros(int32_t i) {
return n - static_cast<int32_t>((x << 1) >> 31);
}
+FORCE_INLINE int get_int32_max_bit_width(const std::vector<int32_t>& nums) {
+ int ret = 1;
+ for (auto num : nums) {
+ int bit_width = 32 - number_of_leading_zeros(num);
+ ret = std::max(ret, bit_width);
+ }
+ return ret;
+}
+
FORCE_INLINE int32_t number_of_leading_zeros(int64_t i) {
if (i == 0) {
return 64;
@@ -145,5 +154,14 @@ FORCE_INLINE int32_t number_of_trailing_zeros(int64_t i) {
return n - static_cast<int32_t>((x << 1) >> 31);
}
+FORCE_INLINE int get_int64_max_bit_width(const std::vector<int64_t>& nums) {
+ int ret = 1;
+ for (auto num : nums) {
+ int bit_width = 64 - number_of_leading_zeros(num);
+ ret = std::max(ret, bit_width);
+ }
+ return ret;
+}
+
} // end namespace storage
#endif // ENCODING_ENCODE_UTILS_H
diff --git a/cpp/src/encoding/encoder_factory.h
b/cpp/src/encoding/encoder_factory.h
index ab132389..e19aeecf 100644
--- a/cpp/src/encoding/encoder_factory.h
+++ b/cpp/src/encoding/encoder_factory.h
@@ -22,10 +22,14 @@
#include "common/global.h"
#include "dictionary_encoder.h"
+#include "double_sprintz_encoder.h"
#include "encoder.h"
#include "encoding/int32_rle_encoder.h"
#include "encoding/int64_rle_encoder.h"
+#include "float_sprintz_encoder.h"
#include "gorilla_encoder.h"
+#include "int32_sprintz_encoder.h"
+#include "int64_sprintz_encoder.h"
#include "plain_encoder.h"
#include "ts2diff_encoder.h"
#include "zigzag_encoder.h"
@@ -140,6 +144,20 @@ class EncoderFactory {
return nullptr;
}
+ case SPRINTZ:
+ switch (data_type) {
+ case INT32:
+ ALLOC_AND_RETURN_ENCODER(Int32SprintzEncoder);
+ case INT64:
+ ALLOC_AND_RETURN_ENCODER(Int64SprintzEncoder);
+ case FLOAT:
+ ALLOC_AND_RETURN_ENCODER(FloatSprintzEncoder);
+ case DOUBLE:
+ ALLOC_AND_RETURN_ENCODER(DoubleSprintzEncoder);
+ default:
+ return nullptr;
+ }
+
case DIFF:
case BITMAP:
case GORILLA_V1:
diff --git a/cpp/src/encoding/fire.h b/cpp/src/encoding/fire.h
new file mode 100644
index 00000000..9b319a17
--- /dev/null
+++ b/cpp/src/encoding/fire.h
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef ENCODING_FIRE_H
+#define ENCODING_FIRE_H
+
+#include <cstdint>
+
+template <typename T>
+class Fire {
+ public:
+ explicit Fire(int learning_rate)
+ : learn_shift_(learning_rate),
+ bit_width_(0),
+ accumulator_(0),
+ delta_(0) {}
+
+ virtual ~Fire() = default;
+
+ virtual T predict(T value) = 0;
+
+ virtual void train(T pre, T val, T err) = 0;
+
+ virtual void reset() {
+ accumulator_ = 0;
+ delta_ = 0;
+ }
+
+ protected:
+ int learn_shift_;
+ int bit_width_;
+ int accumulator_;
+ T delta_;
+};
+
+class IntFire : public Fire<int> {
+ public:
+ explicit IntFire(int learning_rate) : Fire(learning_rate) {
+ bit_width_ = 8;
+ accumulator_ = 0;
+ delta_ = 0;
+ }
+
+ void reset() override {
+ accumulator_ = 0;
+ delta_ = 0;
+ }
+
+ int predict(int value) override {
+ int alpha = accumulator_ >> learn_shift_;
+ int diff = static_cast<int>((static_cast<int64_t>(alpha) * delta_)) >>
+ bit_width_;
+
+ return value + diff;
+ }
+
+ void train(int pre, int val, int err) override {
+ int gradient = err > 0 ? -delta_ : delta_;
+ accumulator_ -= gradient;
+ delta_ = val - pre;
+ }
+};
+
+class LongFire : public Fire<int64_t> {
+ public:
+ explicit LongFire(int learning_rate) : Fire(learning_rate) {
+ bit_width_ = 16;
+ accumulator_ = 0;
+ delta_ = 0;
+ }
+
+ void reset() override {
+ accumulator_ = 0;
+ delta_ = 0;
+ }
+
+ int64_t predict(int64_t value) override {
+ int64_t alpha = accumulator_ >> learn_shift_;
+ int64_t diff = (alpha * delta_) >> bit_width_;
+ return value + diff;
+ }
+
+ void train(int64_t pre, int64_t val, int64_t err) override {
+ int64_t gradient = err > 0 ? -delta_ : delta_;
+ accumulator_ -= gradient;
+ delta_ = val - pre;
+ }
+};
+
+#endif // ENCODING_FIRE_H
diff --git a/cpp/src/encoding/float_sprintz_decoder.h
b/cpp/src/encoding/float_sprintz_decoder.h
new file mode 100644
index 00000000..319b2516
--- /dev/null
+++ b/cpp/src/encoding/float_sprintz_decoder.h
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef FLOAT_SPRINTZ_DECODER_H
+#define FLOAT_SPRINTZ_DECODER_H
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/fire.h"
+#include "gorilla_decoder.h"
+#include "int32_packer.h"
+#include "sprintz_decoder.h"
+
+namespace storage {
+
+class FloatSprintzDecoder : public SprintzDecoder {
+ public:
+ FloatSprintzDecoder() : fire_pred_(2), predict_scheme_("fire") {
+ SprintzDecoder::reset();
+ current_buffer_.resize(block_size_ + 1);
+ convert_buffer_.resize(block_size_);
+ pre_value_ = 0;
+ current_value_ = 0.0f;
+ current_count_ = 0;
+ decode_size_ = 0;
+ is_block_read_ = false;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0.0f);
+ std::fill(convert_buffer_.begin(), convert_buffer_.end(), 0);
+ fire_pred_.reset();
+ }
+
+ ~FloatSprintzDecoder() override = default;
+
+ void set_predict_method(const std::string &method) {
+ predict_scheme_ = method;
+ }
+
+ int read_boolean(bool &ret_value, common::ByteStream &in) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_int32(int32_t &ret_value, common::ByteStream &in) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_int64(int64_t &ret_value, common::ByteStream &in) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_double(double &ret_value, common::ByteStream &in) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_String(common::String &ret_value, common::PageArena &pa,
+ common::ByteStream &in) {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ void reset() override {
+ SprintzDecoder::reset();
+ pre_value_ = 0;
+ current_value_ = 0.0f;
+ current_count_ = 0;
+ decode_size_ = 0;
+ is_block_read_ = false;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0.0f);
+ std::fill(convert_buffer_.begin(), convert_buffer_.end(), 0);
+ fire_pred_.reset();
+ }
+
+ bool has_remaining(const common::ByteStream &input) override {
+ int min_length = sizeof(uint32_t) + 1;
+ return (is_block_read_ && current_count_ < decode_size_) ||
+ input.remaining_size() >= min_length;
+ }
+
+ int read_float(float &ret_value, common::ByteStream &input) override {
+ int ret = common::E_OK;
+ if (!is_block_read_) {
+ if (RET_FAIL(decode_block(input))) {
+ return ret;
+ }
+ }
+ ret_value = current_buffer_[current_count_++];
+ if (current_count_ == decode_size_) {
+ is_block_read_ = false;
+ current_count_ = 0;
+ }
+ return ret;
+ }
+
+ protected:
+ int decode_block(common::ByteStream &input) override {
+ // read header bitWidth
+ int ret = common::E_OK;
+ uint8_t byte;
+ uint32_t bit_width = 0, read_len = 0;
+ ret = input.read_buf(&byte, 1, read_len);
+ if (ret != common::E_OK || read_len != 1) {
+ return common::E_DECODE_ERR;
+ }
+ bit_width |= static_cast<uint32_t>(byte);
+ bit_width_ = static_cast<int32_t>(bit_width);
+
+ if ((bit_width_ & (1 << 7)) != 0) {
+ decode_size_ = bit_width_ & ~(1 << 7);
+ FloatGorillaDecoder decoder;
+ for (int i = 0; i < decode_size_; ++i) {
+ if (RET_FAIL(decoder.read_float(current_buffer_[i], input))) {
+ return ret;
+ }
+ }
+ } else {
+ // packed block
+ decode_size_ = block_size_ + 1;
+ common::SerializationUtil::read_float(pre_value_, input);
+ current_buffer_[0] = pre_value_;
+ // read packed data
+ std::vector<uint8_t> pack_buf(bit_width_);
+ uint32_t read_len = 0;
+ input.read_buf(reinterpret_cast<char *>(pack_buf.data()),
+ bit_width_, read_len);
+ packer_ = std::make_shared<Int32Packer>(bit_width_);
+ std::vector<int32_t> tmp_buffer(block_size_);
+ packer_->unpack_8values(pack_buf.data(), 0, tmp_buffer.data());
+ // move into convert_buffer_
+ for (int i = 0; i < block_size_; ++i) {
+ convert_buffer_[i] = tmp_buffer[i];
+ }
+ ret = recalculate();
+ }
+ is_block_read_ = true;
+ return ret;
+ }
+
+ int recalculate() override {
+ int ret = common::E_OK;
+ for (int i = 0; i < block_size_; ++i) {
+ int32_t v = convert_buffer_[i];
+ convert_buffer_[i] = (v % 2 == 0) ? -v / 2 : (v + 1) / 2;
+ }
+
+ if (predict_scheme_ == "delta") {
+ uint32_t prev_bits;
+ std::memcpy(&prev_bits, ¤t_buffer_[0], sizeof(prev_bits));
+ // Java: convertBuffer[0] = convertBuffer[0] +
+ // Float.floatToIntBits(preValue);
+ int32_t corrected0 =
+ convert_buffer_[0] + static_cast<int32_t>(prev_bits);
+ convert_buffer_[0] = corrected0;
+ // Java: currentBuffer[1] = Float.intBitsToFloat(convertBuffer[0]);
+ float f0;
+ std::memcpy(&f0, &corrected0, sizeof(corrected0));
+ current_buffer_[1] = f0;
+
+ for (int i = 1; i < block_size_; ++i) {
+ // Java: convertBuffer[i] += convertBuffer[i - 1];
+ convert_buffer_[i] += convert_buffer_[i - 1];
+ int32_t bits = convert_buffer_[i];
+ float fi;
+ std::memcpy(&fi, &bits, sizeof(bits));
+ current_buffer_[i + 1] = fi;
+ }
+
+ } else if (predict_scheme_ == "fire") {
+ fire_pred_.reset();
+ uint32_t prev_bits;
+ std::memcpy(&prev_bits, ¤t_buffer_[0], sizeof(prev_bits));
+ // Java: int p = firePred.predict(Float.floatToIntBits(preValue));
+ int32_t p = fire_pred_.predict(prev_bits);
+ int32_t e0 = convert_buffer_[0];
+ int32_t corrected0 = p + e0;
+ convert_buffer_[0] = corrected0;
+ float f0;
+ std::memcpy(&f0, &corrected0, sizeof(corrected0));
+ current_buffer_[1] = f0;
+ // Java: firePred.train(Float.floatToIntBits(preValue),
+ // convertBuffer[0], e);
+ fire_pred_.train(prev_bits, corrected0, e0);
+
+ for (int i = 1; i < block_size_; ++i) {
+ uint32_t prev_bits_i;
+ std::memcpy(&prev_bits_i, ¤t_buffer_[i],
+ sizeof(prev_bits_i));
+ int32_t err = convert_buffer_[i];
+ int32_t pred = fire_pred_.predict(prev_bits_i);
+ int32_t corrected = pred + err;
+ convert_buffer_[i] = corrected;
+ float fi;
+ std::memcpy(&fi, &corrected, sizeof(corrected));
+ current_buffer_[i + 1] = fi;
+ // Java: firePred.train(convertBuffer[i - 1], convertBuffer[i],
+ // err);
+ fire_pred_.train(prev_bits_i, corrected, err);
+ }
+
+ } else {
+ ret = common::E_DECODE_ERR;
+ }
+ return ret;
+ }
+
+ private:
+ float pre_value_;
+ float current_value_;
+ size_t current_count_;
+ int decode_size_;
+ bool is_block_read_ = false;
+
+ std::vector<float> current_buffer_;
+ std::vector<int32_t> convert_buffer_;
+ std::shared_ptr<Int32Packer> packer_;
+ IntFire fire_pred_;
+ std::string predict_scheme_;
+};
+
+} // namespace storage
+
+#endif // FLOAT_SPRINTZ_DECODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/float_sprintz_encoder.h
b/cpp/src/encoding/float_sprintz_encoder.h
new file mode 100644
index 00000000..01151a23
--- /dev/null
+++ b/cpp/src/encoding/float_sprintz_encoder.h
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef FLOAT_SPRINTZ_ENCODER_H
+#define FLOAT_SPRINTZ_ENCODER_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/encode_utils.h"
+#include "encoding/fire.h"
+#include "encoding/int32_rle_encoder.h"
+#include "gorilla_encoder.h"
+#include "int32_packer.h"
+#include "sprintz_encoder.h"
+
+namespace storage {
+
+class FloatSprintzEncoder : public SprintzEncoder {
+ public:
+ FloatSprintzEncoder() : fire_pred_(2) {
+ convert_buffer_.resize(block_size_);
+ }
+
+ ~FloatSprintzEncoder() override = default;
+
+ void reset() override {
+ SprintzEncoder::reset();
+ values_.clear();
+ }
+
+ void destroy() override {}
+
+ int get_one_item_max_size() override {
+ return 1 + (1 + block_size_) * sizeof(int32_t);
+ }
+
+ int get_max_byte_size() override {
+ return 1 + (values_.size() + 1) * sizeof(int32_t);
+ }
+
+ int encode(bool, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(int32_t, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(int64_t, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(float value, common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (!is_first_cached_) {
+ values_.push_back(value);
+ is_first_cached_ = true;
+ return ret;
+ }
+ values_.push_back(value);
+
+ if (values_.size() == block_size_ + 1) {
+ fire_pred_.reset();
+ for (int i = 1; i <= block_size_; ++i) {
+ convert_buffer_[i - 1] = predict(values_[i], values_[i - 1]);
+ }
+ bit_pack();
+ is_first_cached_ = false;
+ values_.clear();
+ group_num_++;
+ if (group_num_ == group_max_) {
+ if (RET_FAIL(flush(out_stream))) return ret;
+ }
+ }
+ return ret;
+ }
+ int encode(double, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int encode(const common::String, common::ByteStream&) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int flush(common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (byte_cache_.total_size() > 0) {
+ if (RET_FAIL(common::SerializationUtil::chunk_read_all_data(
+ byte_cache_, out_stream))) {
+ return ret;
+ }
+ }
+
+ if (!values_.empty()) {
+ int size = static_cast<int>(values_.size());
+ size |= (1 << 7);
+ common::SerializationUtil::
+ write_int_little_endian_padded_on_bit_width(size, out_stream,
+ 1);
+ FloatGorillaEncoder encoder;
+ for (float val : values_) {
+ encoder.encode(val, out_stream);
+ }
+ encoder.flush(out_stream);
+ }
+
+ reset();
+ return ret;
+ }
+
+ protected:
+ void bit_pack() override {
+ // extract and remove first value
+ float pre_bits = values_[0];
+ values_.erase(values_.begin());
+
+ bit_width_ = get_int32_max_bit_width(convert_buffer_);
+ packer_ = std::make_shared<Int32Packer>(bit_width_);
+
+ std::vector<uint8_t> bytes(bit_width_);
+ packer_->pack_8values(convert_buffer_.data(), 0, bytes.data());
+
+ common::SerializationUtil::write_int_little_endian_padded_on_bit_width(
+ bit_width_, byte_cache_, 1);
+ uint8_t buffer[4];
+ common::float_to_bytes(pre_bits, buffer);
+ byte_cache_.write_buf(reinterpret_cast<const char*>(buffer), 4);
+ byte_cache_.write_buf(reinterpret_cast<const char*>(bytes.data()),
+ bytes.size());
+ }
+
+ int32_t predict(float value, float prev_value) {
+ int32_t curr_bits = common::float_to_int(value);
+ int32_t prev_bits = common::float_to_int(prev_value);
+ int32_t raw_pred;
+ if (predict_method_ == "delta") {
+ raw_pred = curr_bits - prev_bits;
+ } else if (predict_method_ == "fire") {
+ int32_t pred = fire_pred_.predict(prev_bits);
+ int32_t err = curr_bits - pred;
+ fire_pred_.train(prev_bits, curr_bits, err);
+ raw_pred = err;
+ } else {
+ // unsupported
+ ASSERT(false);
+ }
+ return (raw_pred <= 0) ? -2 * raw_pred : 2 * raw_pred - 1;
+ }
+
+ private:
+ std::vector<float> values_;
+ std::vector<int32_t> convert_buffer_;
+ std::shared_ptr<Int32Packer> packer_;
+ IntFire fire_pred_;
+};
+
+} // namespace storage
+
+#endif // FLOAT_SPRINTZ_ENCODER_H
diff --git a/cpp/src/encoding/gorilla_decoder.h
b/cpp/src/encoding/gorilla_decoder.h
index 726bbf30..e2b62063 100644
--- a/cpp/src/encoding/gorilla_decoder.h
+++ b/cpp/src/encoding/gorilla_decoder.h
@@ -49,7 +49,9 @@ class GorillaDecoder : public Decoder {
}
FORCE_INLINE bool has_next() { return has_next_; }
- FORCE_INLINE bool has_remaining() { return has_next(); }
+ FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) {
+ return buffer.has_remaining() || has_next();
+ }
// If empty, cache 8 bits from in_stream to 'buffer_'.
void flush_byte_if_empty(common::ByteStream &in) {
diff --git a/cpp/src/encoding/int32_rle_decoder.h
b/cpp/src/encoding/int32_rle_decoder.h
index d80269f7..647f095a 100644
--- a/cpp/src/encoding/int32_rle_decoder.h
+++ b/cpp/src/encoding/int32_rle_decoder.h
@@ -55,7 +55,9 @@ class Int32RleDecoder : public Decoder {
tmp_buf_(nullptr) {}
~Int32RleDecoder() override { destroy(); }
- bool has_remaining() override { return has_next_package(); }
+ bool has_remaining(const common::ByteStream &buffer) override {
+ return buffer.has_remaining() || has_next_package();
+ }
int read_boolean(bool &ret_value, common::ByteStream &in) {
int32_t bool_value;
read_int32(bool_value, in);
diff --git a/cpp/src/encoding/int32_sprintz_decoder.h
b/cpp/src/encoding/int32_sprintz_decoder.h
new file mode 100644
index 00000000..1a23692c
--- /dev/null
+++ b/cpp/src/encoding/int32_sprintz_decoder.h
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INT32_SPRINTZ_DECODER_H
+#define INT32_SPRINTZ_DECODER_H
+
+#include <iostream>
+#include <istream>
+#include <memory>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "encoding/fire.h"
+#include "encoding/int32_rle_decoder.h"
+#include "int32_packer.h"
+#include "sprintz_decoder.h"
+
+namespace storage {
+
+class Int32SprintzDecoder : public SprintzDecoder {
+ public:
+ Int32SprintzDecoder()
+ : current_value_(0),
+ pre_value_(0),
+ current_buffer_(block_size_ + 1),
+ fire_pred_(2),
+ predict_scheme_("fire") {
+ SprintzDecoder::reset();
+ current_value_ = 0;
+ pre_value_ = 0;
+ current_count_ = 0;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0);
+ }
+
+ ~Int32SprintzDecoder() override = default;
+
+ void set_predict_method(const std::string &method) {
+ predict_scheme_ = method;
+ }
+
+ bool has_remaining(const common::ByteStream &in) {
+ int min_len = sizeof(int32_t) + 1;
+ return (is_block_read_ && current_count_ < block_size_) ||
+ in.remaining_size() >= min_len;
+ }
+
+ int read_boolean(bool &ret_value, common::ByteStream &in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_int64(int64_t &ret_value, common::ByteStream &in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_float(float &ret_value, common::ByteStream &in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_double(double &ret_value, common::ByteStream &in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+ int read_String(common::String &ret_value, common::PageArena &pa,
+ common::ByteStream &in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_int32(int32_t &ret_value, common::ByteStream &in) {
+ int ret = common::E_OK;
+ if (!is_block_read_) {
+ if (RET_FAIL(decode_block(in))) {
+ return ret;
+ }
+ }
+ ret_value = current_buffer_[current_count_++];
+ if (current_count_ == decode_size_) {
+ is_block_read_ = false;
+ current_count_ = 0;
+ }
+ return ret;
+ }
+
+ void reset() override {
+ SprintzDecoder::reset();
+ current_value_ = 0;
+ pre_value_ = 0;
+ current_count_ = 0;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0);
+ }
+
+ bool has_next(common::ByteStream &input) {
+ int min_lenth = sizeof(int32_t) + 1;
+ return (is_block_read_ && current_count_ < block_size_) ||
+ input.remaining_size() >= min_lenth;
+ }
+
+ protected:
+ int decode_block(common::ByteStream &input) override {
+ // read header bitWidth
+ int ret = common::E_OK;
+ uint8_t byte;
+ uint32_t bit_width = 0, read_len = 0;
+ ret = input.read_buf(&byte, 1, read_len);
+ if (ret != common::E_OK || read_len != 1) {
+ return common::E_DECODE_ERR;
+ }
+ bit_width |= static_cast<uint32_t>(byte);
+ bit_width_ = static_cast<int32_t>(bit_width);
+
+ if ((bit_width_ & (1 << 7)) != 0) {
+ decode_size_ = bit_width_ & ~(1 << 7);
+ Int32RleDecoder decoder;
+ for (int i = 0; i < decode_size_; ++i) {
+ current_buffer_[i] = decoder.read_int(input);
+ }
+ } else {
+ decode_size_ = block_size_ + 1;
+ uint32_t tmp_prev_value;
+ common::SerializationUtil::read_var_uint(tmp_prev_value, input);
+ pre_value_ = tmp_prev_value;
+ current_buffer_[0] = pre_value_;
+
+ std::vector<uint8_t> pack_buf(bit_width_);
+ uint32_t read_len = 0;
+ input.read_buf(reinterpret_cast<char *>(pack_buf.data()),
+ bit_width_, read_len);
+
+ std::vector<int32_t> tmp_buffer(8);
+ packer_ = std::make_shared<Int32Packer>(bit_width_);
+ packer_->unpack_8values(pack_buf.data(), 0, tmp_buffer.data());
+
+ for (int i = 0; i < 8; ++i) {
+ current_buffer_[i + 1] = tmp_buffer[i];
+ }
+ ret = recalculate();
+ }
+ is_block_read_ = true;
+ return ret;
+ }
+
+ int recalculate() override {
+ int ret = common::E_OK;
+ for (int i = 1; i <= block_size_; ++i) {
+ if (current_buffer_[i] % 2 == 0) {
+ current_buffer_[i] = -current_buffer_[i] / 2;
+ } else {
+ current_buffer_[i] = (current_buffer_[i] + 1) / 2;
+ }
+ }
+
+ if (predict_scheme_ == "delta") {
+ for (size_t i = 1; i < current_buffer_.size(); ++i) {
+ current_buffer_[i] += current_buffer_[i - 1];
+ }
+ } else if (predict_scheme_ == "fire") {
+ fire_pred_.reset();
+ for (int i = 1; i <= block_size_; ++i) {
+ int32_t pred = fire_pred_.predict(current_buffer_[i - 1]);
+ int32_t err = current_buffer_[i];
+ current_buffer_[i] = pred + err;
+ fire_pred_.train(current_buffer_[i - 1], current_buffer_[i],
+ err);
+ }
+ } else {
+ ret = common::E_DECODE_ERR;
+ }
+ return ret;
+ }
+
+ private:
+ std::shared_ptr<Int32Packer> packer_;
+ IntFire fire_pred_;
+ int32_t pre_value_;
+ int32_t current_value_;
+ std::vector<int32_t> current_buffer_;
+ std::string predict_scheme_;
+};
+
+} // namespace storage
+
+#endif // INT32_SPRINTZ_DECODER_H
diff --git a/cpp/src/encoding/int32_sprintz_encoder.h
b/cpp/src/encoding/int32_sprintz_encoder.h
new file mode 100644
index 00000000..c7d38e26
--- /dev/null
+++ b/cpp/src/encoding/int32_sprintz_encoder.h
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INT32SPRINTZENCODER_H
+#define INT32SPRINTZENCODER_H
+
+#include <iostream>
+#include <memory>
+#include <sstream>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encode_utils.h"
+#include "encoding/encode_utils.h"
+#include "encoding/fire.h"
+#include "encoding/int32_rle_encoder.h"
+#include "int32_packer.h"
+#include "sprintz_encoder.h"
+
+namespace storage {
+class Int32SprintzEncoder : public SprintzEncoder {
+ public:
+ Int32SprintzEncoder() : SprintzEncoder(), fire_pred_(2) {}
+
+ ~Int32SprintzEncoder() override = default;
+
+ void reset() override {
+ SprintzEncoder::reset();
+ values_.clear();
+ }
+
+ void destroy() override {}
+
+ int encode(bool value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(int64_t value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(float value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(double value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(common::String value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int get_one_item_max_size() override {
+ return 1 + (1 + block_size_) * sizeof(int32_t);
+ }
+
+ int get_max_byte_size() override {
+ return 1 + (values_.size() + 1) * sizeof(int32_t);
+ }
+
+ int encode(int32_t value, common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (!is_first_cached_) {
+ values_.push_back(value);
+ is_first_cached_ = true;
+ return ret;
+ }
+
+ values_.push_back(value);
+
+ if (values_.size() == block_size_ + 1) {
+ int32_t prev = values_[0];
+ fire_pred_.reset();
+ for (int i = 1; i <= block_size_; ++i) {
+ int32_t temp = values_[i];
+ values_[i] = predict(values_[i], prev);
+ prev = temp;
+ }
+
+ bit_pack();
+ is_first_cached_ = false;
+ values_.clear();
+ group_num_++;
+
+ if (group_num_ == group_max_) {
+ if (RET_FAIL(flush(out_stream))) {
+ return ret;
+ }
+ }
+ }
+ return ret;
+ }
+
+ int flush(common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (byte_cache_.total_size() > 0) {
+ if (RET_FAIL(common::SerializationUtil::chunk_read_all_data(
+ byte_cache_, out_stream))) {
+ return ret;
+ }
+ }
+
+ if (!values_.empty()) {
+ int size = static_cast<int>(values_.size());
+ size |= (1 << 7); // set MSB
+
+ common::SerializationUtil::
+ write_int_little_endian_padded_on_bit_width(size, out_stream,
+ 1);
+ Int32RleEncoder encoder;
+ for (int32_t val : values_) {
+ encoder.encode(val, out_stream);
+ }
+ encoder.flush(out_stream);
+ }
+
+ reset();
+ return ret;
+ }
+
+ protected:
+ void bit_pack() override {
+ int32_t pre_value = values_[0];
+ values_.erase(values_.begin()); // remove first value
+
+ bit_width_ = get_int32_max_bit_width(values_);
+ packer_ = std::make_shared<Int32Packer>(bit_width_);
+
+ std::vector<uint8_t> bytes(bit_width_);
+ std::vector<int32_t> tmp_buffer(values_.begin(),
+ values_.begin() + block_size_);
+ packer_->pack_8values(tmp_buffer.data(), 0, bytes.data());
+
+ common::SerializationUtil::write_int_little_endian_padded_on_bit_width(
+ bit_width_, byte_cache_, 1);
+ common::SerializationUtil::write_var_uint(pre_value, byte_cache_);
+ byte_cache_.write_buf(reinterpret_cast<const char*>(bytes.data()),
+ bytes.size());
+ }
+
+ int32_t predict(int32_t value, int32_t prev) {
+ int32_t pred = 0;
+ if (predict_method_ == "delta") {
+ pred = delta(value, prev);
+ } else if (predict_method_ == "fire") {
+ pred = fire(value, prev);
+ } else {
+ // unsupport
+ ASSERT(false);
+ }
+
+ return (pred <= 0) ? -2 * pred : 2 * pred - 1;
+ }
+
+ int32_t delta(int32_t value, int32_t prev) { return value - prev; }
+
+ int32_t fire(int32_t value, int32_t prev) {
+ int32_t pred = fire_pred_.predict(prev);
+ int32_t err = value - pred;
+ fire_pred_.train(prev, value, err);
+ return err;
+ }
+
+ private:
+ std::vector<int32_t> values_;
+ std::shared_ptr<Int32Packer> packer_;
+ IntFire fire_pred_;
+};
+} // namespace storage
+
+#endif // INT32_SPRINTZ_ENCODER_H
diff --git a/cpp/src/encoding/int64_rle_decoder.h
b/cpp/src/encoding/int64_rle_decoder.h
index 6d54510b..7b98cc0c 100644
--- a/cpp/src/encoding/int64_rle_decoder.h
+++ b/cpp/src/encoding/int64_rle_decoder.h
@@ -55,7 +55,9 @@ class Int64RleDecoder : public Decoder {
tmp_buf_(nullptr) {}
~Int64RleDecoder() override { destroy(); }
- bool has_remaining() override { return has_next_package(); }
+ bool has_remaining(const common::ByteStream &buffer) override {
+ return buffer.has_remaining() || has_next_package();
+ }
int read_boolean(bool &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
diff --git a/cpp/src/encoding/int64_sprintz_decoder.h
b/cpp/src/encoding/int64_sprintz_decoder.h
new file mode 100644
index 00000000..d1db9f94
--- /dev/null
+++ b/cpp/src/encoding/int64_sprintz_decoder.h
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INT64_SPRINTZ_DECODER_H
+#define INT64_SPRINTZ_DECODER_H
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/fire.h"
+#include "encoding/int64_packer.h"
+#include "encoding/int64_rle_decoder.h"
+#include "sprintz_decoder.h"
+
+namespace storage {
+
+class Int64SprintzDecoder : public SprintzDecoder {
+ public:
+ Int64SprintzDecoder()
+ : current_value_(0),
+ pre_value_(0),
+ current_buffer_(block_size_ + 1),
+ fire_pred_(3),
+ predict_scheme_("fire") {
+ SprintzDecoder::reset();
+ current_count_ = 0;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0);
+ }
+
+ ~Int64SprintzDecoder() override = default;
+
+ void set_predict_method(const std::string& method) {
+ predict_scheme_ = method;
+ }
+
+ void reset() override {
+ SprintzDecoder::reset();
+ current_value_ = 0;
+ pre_value_ = 0;
+ current_count_ = 0;
+ std::fill(current_buffer_.begin(), current_buffer_.end(), 0);
+ }
+
+ bool has_remaining(const common::ByteStream& in) {
+ return (is_block_read_ && current_count_ < block_size_) ||
+ in.has_remaining();
+ }
+
+ bool has_next(common::ByteStream& input) {
+ return (is_block_read_ && current_count_ < block_size_) ||
+ input.remaining_size() > 0;
+ }
+
+ int read_int32(int32_t& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_boolean(bool& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_float(float& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_double(double& ret_value, common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_String(common::String& ret_value, common::PageArena& pa,
+ common::ByteStream& in) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int read_int64(int64_t& ret_value, common::ByteStream& in) override {
+ int ret = common::E_OK;
+ if (!is_block_read_) {
+ if (RET_FAIL(decode_block(in))) {
+ return ret;
+ }
+ }
+ ret_value = current_buffer_[current_count_++];
+ if (current_count_ == decode_size_) {
+ is_block_read_ = false;
+ current_count_ = 0;
+ }
+ return ret;
+ }
+
+ protected:
+ int decode_block(common::ByteStream& input) override {
+ // read header bitWidth
+ int ret = common::E_OK;
+ uint8_t byte;
+ uint32_t bit_width = 0, read_len = 0;
+ ret = input.read_buf(&byte, 1, read_len);
+ if (ret != common::E_OK || read_len != 1) {
+ return common::E_DECODE_ERR;
+ }
+ bit_width |= static_cast<uint32_t>(byte);
+ bit_width_ = static_cast<int32_t>(bit_width);
+
+ if ((bit_width_ & (1 << 7)) != 0) {
+ decode_size_ = bit_width_ & ~(1 << 7);
+ Int64RleDecoder decoder;
+ for (int i = 0; i < decode_size_; ++i) {
+ current_buffer_[i] = decoder.read_int(input);
+ }
+ } else {
+ decode_size_ = block_size_ + 1;
+
+ common::SerializationUtil::read_i64(pre_value_, input);
+ current_buffer_[0] = pre_value_;
+
+ // Read packed buffer
+ std::vector<uint8_t> pack_buf(bit_width_);
+ uint32_t read_len = 0;
+ input.read_buf(reinterpret_cast<char*>(pack_buf.data()),
bit_width_,
+ read_len);
+
+ std::vector<int64_t> tmp_buffer(8);
+ packer_ = std::make_shared<Int64Packer>(bit_width_);
+ packer_->unpack_8values(pack_buf.data(), 0, tmp_buffer.data());
+
+ for (int i = 0; i < 8; ++i) {
+ current_buffer_[i + 1] = tmp_buffer[i];
+ }
+
+ ret = recalculate();
+ }
+
+ is_block_read_ = true;
+ return ret;
+ }
+
+ int recalculate() override {
+ int ret = common::E_OK;
+ for (int i = 1; i <= block_size_; ++i) {
+ if ((current_buffer_[i] & 1) == 0) {
+ current_buffer_[i] = -current_buffer_[i] / 2;
+ } else {
+ current_buffer_[i] = (current_buffer_[i] + 1) / 2;
+ }
+ }
+
+ if (predict_scheme_ == "delta") {
+ for (int i = 1; i <= block_size_; ++i) {
+ current_buffer_[i] += current_buffer_[i - 1];
+ }
+ } else if (predict_scheme_ == "fire") {
+ fire_pred_.reset();
+ for (int i = 1; i <= block_size_; ++i) {
+ int64_t pred = fire_pred_.predict(current_buffer_[i - 1]);
+ int64_t err = current_buffer_[i];
+ current_buffer_[i] = pred + err;
+ fire_pred_.train(current_buffer_[i - 1], current_buffer_[i],
+ err);
+ }
+ } else {
+ ret = common::E_DECODE_ERR;
+ }
+ return ret;
+ }
+
+ private:
+ std::shared_ptr<Int64Packer> packer_;
+ LongFire fire_pred_;
+ int64_t pre_value_;
+ int64_t current_value_;
+ std::vector<int64_t> current_buffer_;
+ std::string predict_scheme_;
+};
+
+} // namespace storage
+
+#endif // INT64_SPRINTZ_DECODER_H
diff --git a/cpp/src/encoding/int64_sprintz_encoder.h
b/cpp/src/encoding/int64_sprintz_encoder.h
new file mode 100644
index 00000000..4c5ca876
--- /dev/null
+++ b/cpp/src/encoding/int64_sprintz_encoder.h
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef INT64_SPRINTZ_ENCODER_H
+#define INT64_SPRINTZ_ENCODER_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/encode_utils.h"
+#include "encoding/fire.h"
+#include "encoding/int64_packer.h"
+#include "encoding/int64_rle_encoder.h"
+#include "sprintz_encoder.h"
+
+namespace storage {
+
+class Int64SprintzEncoder : public SprintzEncoder {
+ public:
+ Int64SprintzEncoder() : SprintzEncoder(), fire_pred_(3) {}
+
+ ~Int64SprintzEncoder() override = default;
+
+ void reset() override {
+ SprintzEncoder::reset();
+ values_.clear();
+ }
+
+ void destroy() override {}
+
+ int encode(int32_t value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(float value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(double value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(bool value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(common::String value, common::ByteStream& out_stream) override {
+ return common::E_TYPE_NOT_MATCH;
+ }
+
+ int encode(int64_t value, common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+ if (!is_first_cached_) {
+ values_.push_back(value);
+ is_first_cached_ = true;
+ return ret;
+ }
+
+ values_.push_back(value);
+
+ if (values_.size() == block_size_ + 1) {
+ int64_t prev = values_[0];
+ fire_pred_.reset();
+ for (int i = 1; i <= block_size_; ++i) {
+ int64_t temp = values_[i];
+ values_[i] = predict(values_[i], prev);
+ prev = temp;
+ }
+
+ bit_pack();
+ is_first_cached_ = false;
+ values_.clear();
+ group_num_++;
+
+ if (group_num_ == group_max_) {
+ if (RET_FAIL(flush(out_stream))) {
+ return ret;
+ }
+ }
+ }
+
+ return ret;
+ }
+
+ int flush(common::ByteStream& out_stream) override {
+ int ret = common::E_OK;
+
+ if (byte_cache_.total_size() > 0) {
+ if (RET_FAIL(common::SerializationUtil::chunk_read_all_data(
+ byte_cache_, out_stream))) {
+ return ret;
+ }
+ }
+
+ if (!values_.empty()) {
+ int size = static_cast<int>(values_.size());
+ size |= (1 << 7);
+
+ common::SerializationUtil::
+ write_int_little_endian_padded_on_bit_width(size, out_stream,
+ 1);
+
+ Int64RleEncoder encoder;
+ for (int64_t val : values_) {
+ encoder.encode(val, out_stream);
+ }
+ encoder.flush(out_stream);
+ }
+
+ reset();
+ return ret;
+ }
+
+ int get_one_item_max_size() override {
+ return 1 + (1 + block_size_) * sizeof(int64_t);
+ }
+
+ int get_max_byte_size() override {
+ return 1 + (values_.size() + 1) * sizeof(int64_t);
+ }
+
+ protected:
+ void bit_pack() override {
+ int64_t pre_value = values_[0];
+ values_.erase(values_.begin());
+
+ bit_width_ = get_int64_max_bit_width(values_);
+ packer_ = std::make_shared<Int64Packer>(bit_width_);
+
+ std::vector<uint8_t> bytes(bit_width_);
+ std::vector<int64_t> tmp_buffer(values_.begin(),
+ values_.begin() + block_size_);
+ packer_->pack_8values(tmp_buffer.data(), 0, bytes.data());
+
+ common::SerializationUtil::write_int_little_endian_padded_on_bit_width(
+ bit_width_, byte_cache_, 1);
+ common::SerializationUtil::write_i64(pre_value, byte_cache_);
+ byte_cache_.write_buf(reinterpret_cast<const char*>(bytes.data()),
+ bytes.size());
+ }
+
+ int64_t predict(int64_t value, int64_t prev) {
+ int64_t pred = 0;
+ if (predict_method_ == "delta") {
+ pred = delta(value, prev);
+ } else if (predict_method_ == "fire") {
+ pred = fire(value, prev);
+ } else {
+ ASSERT(false);
+ }
+
+ return (pred <= 0) ? -2 * pred : 2 * pred - 1;
+ }
+
+ int64_t delta(int64_t value, int64_t prev) { return value - prev; }
+
+ int64_t fire(int64_t value, int64_t prev) {
+ int64_t pred = fire_pred_.predict(prev);
+ int64_t err = value - pred;
+ fire_pred_.train(prev, value, err);
+ return err;
+ }
+
+ private:
+ std::vector<int64_t> values_;
+ std::shared_ptr<Int64Packer> packer_;
+ LongFire fire_pred_;
+};
+
+} // namespace storage
+
+#endif // INT64_SPRINTZ_ENCODER_H
diff --git a/cpp/src/encoding/plain_decoder.h b/cpp/src/encoding/plain_decoder.h
index 42a555f5..d1c6969e 100644
--- a/cpp/src/encoding/plain_decoder.h
+++ b/cpp/src/encoding/plain_decoder.h
@@ -29,7 +29,9 @@ class PlainDecoder : public Decoder {
~PlainDecoder() override = default;
FORCE_INLINE void reset() { /* do nothing */
}
- FORCE_INLINE bool has_remaining() { return false; }
+ FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) {
+ return buffer.has_remaining();
+ }
FORCE_INLINE int read_boolean(bool &ret_bool, common::ByteStream &in) {
return common::SerializationUtil::read_ui8((uint8_t &)ret_bool, in);
}
diff --git a/cpp/src/encoding/sprintz_decoder.h
b/cpp/src/encoding/sprintz_decoder.h
new file mode 100644
index 00000000..3ad30d4e
--- /dev/null
+++ b/cpp/src/encoding/sprintz_decoder.h
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef SPRINTZ_DECODER_H
+#define SPRINTZ_DECODER_H
+
+#include <cstdint>
+#include <iostream>
+#include <istream>
+#include <memory>
+#include <sstream>
+#include <stdexcept>
+#include <string>
+#include <vector>
+
+#include "decoder.h"
+
+namespace storage {
+
+class SprintzDecoder : public Decoder {
+ public:
+ ~SprintzDecoder() override = default;
+
+ // Reset decoder state
+ void reset() override {
+ is_block_read_ = false;
+ current_count_ = 0;
+ }
+
+ // Decode a compressed block (to be implemented by subclasses)
+ virtual int decode_block(common::ByteStream& in) = 0;
+
+ // Update predictor based on decoded data (to be implemented by subclasses)
+ virtual int recalculate() = 0;
+
+ protected:
+ SprintzDecoder()
+ : bit_width_(0),
+ block_size_(8),
+ is_block_read_(false),
+ current_count_(0),
+ decode_size_(0) {}
+
+ protected:
+ int bit_width_; // Current bit width being used
+ int block_size_; // Default is 8
+ bool is_block_read_; // Whether current block has been read
+ int current_count_; // Current decoding position
+ int decode_size_; // Number of valid data items in current decoded
block
+};
+
+} // namespace storage
+
+#endif // SPRINTZ_DECODER_H
\ No newline at end of file
diff --git a/cpp/src/encoding/sprintz_encoder.h
b/cpp/src/encoding/sprintz_encoder.h
new file mode 100644
index 00000000..67e4dae8
--- /dev/null
+++ b/cpp/src/encoding/sprintz_encoder.h
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef SPRINTZ_ENCODER_H
+#define SPRINTZ_ENCODER_H
+
+#include <sstream>
+#include <string>
+
+#include "decoder.h"
+
+namespace storage {
+class SprintzEncoder : public Encoder {
+ public:
+ virtual ~SprintzEncoder() override = default;
+
+ void set_predict_method(const std::string& method) {
+ predict_method_ = method;
+ }
+
+ virtual void reset() {
+ byte_cache_.reset();
+ is_first_cached_ = false;
+ group_num_ = 0;
+ }
+
+ virtual int get_one_item_max_size() = 0;
+
+ virtual void bit_pack() = 0;
+
+ protected:
+ SprintzEncoder()
+ : block_size_(8),
+ group_max_(16),
+ group_num_(0),
+ bit_width_(0),
+ byte_cache_(1024, common::MOD_ENCODER_OBJ),
+ is_first_cached_(false),
+ predict_method_("fire") {}
+
+ protected:
+ int block_size_; // Size of each compressed block, default 8
+ int group_max_; // Maximum number of groups, default 16
+ int group_num_; // Current group count
+ int bit_width_; // Current bit width being used
+ common::ByteStream byte_cache_{};
+ std::string
+ predict_method_{}; // Prediction method, e.g. "delta", "fire", etc.
+ bool is_first_cached_; // Whether the first value has been cached
+};
+} // namespace storage
+
+#endif // SPRINTZ_ENCODER_H
diff --git a/cpp/src/encoding/ts2diff_decoder.h
b/cpp/src/encoding/ts2diff_decoder.h
index 2da8b611..a19e6163 100644
--- a/cpp/src/encoding/ts2diff_decoder.h
+++ b/cpp/src/encoding/ts2diff_decoder.h
@@ -48,7 +48,8 @@ class TS2DIFFDecoder : public Decoder {
current_index_ = 0;
}
- FORCE_INLINE bool has_remaining() {
+ FORCE_INLINE bool has_remaining(const common::ByteStream &buffer) {
+ if (buffer.has_remaining()) return true;
return bits_left_ != 0 || (current_index_ <= write_index_ &&
write_index_ != -1 && current_index_ != 0);
}
diff --git a/cpp/src/encoding/zigzag_decoder.h
b/cpp/src/encoding/zigzag_decoder.h
index 5b93951b..e12f7489 100644
--- a/cpp/src/encoding/zigzag_decoder.h
+++ b/cpp/src/encoding/zigzag_decoder.h
@@ -46,7 +46,9 @@ class ZigzagDecoder : public Decoder {
zigzag_decode_arr_ = nullptr;
}
- bool has_remaining() override { return !list_transit_in_zd_.empty(); }
+ bool has_remaining(const common::ByteStream &buffer) override {
+ return buffer.has_remaining() || !list_transit_in_zd_.empty();
+ }
int read_boolean(bool &ret_value, common::ByteStream &in) override {
return common::E_TYPE_NOT_MATCH;
}
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 8a96ff75..9578ecca 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -521,9 +521,8 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
uint32_t mask = 1 << 7;
\
int64_t time = 0;
\
CppType value;
\
- while (
\
- (time_decoder_->has_remaining() || time_in.has_remaining()) &&
\
- (value_decoder_->has_remaining() || value_in.has_remaining())) {
\
+ while (time_decoder_->has_remaining(time_in) &&
\
+ value_decoder_->has_remaining(value_in)) {
\
cur_value_index++;
\
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &
\
0xFF) &
\
@@ -566,8 +565,8 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
uint32_t mask = 1 << 7;
int64_t time = 0;
int32_t value;
- while ((time_decoder_->has_remaining() || time_in.has_remaining()) &&
- (value_decoder_->has_remaining() || value_in.has_remaining())) {
+ while (time_decoder_->has_remaining(time_in) &&
+ value_decoder_->has_remaining(value_in)) {
cur_value_index++;
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
@@ -654,8 +653,8 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
int ret = E_OK;
int64_t time = 0;
common::String value;
- while (time_decoder_->has_remaining() || time_in.has_remaining()) {
- ASSERT(value_decoder_->has_remaining() || value_in.has_remaining());
+ while (time_decoder_->has_remaining(time_in)) {
+ ASSERT(value_decoder_->has_remaining(value_in));
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
break;
diff --git a/cpp/src/reader/aligned_chunk_reader.h
b/cpp/src/reader/aligned_chunk_reader.h
index 998409cd..7bf29047 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -105,12 +105,12 @@ class AlignedChunkReader : public IChunkReader {
Filter *filter,
common::PageArena *pa);
bool prev_time_page_not_finish() const {
- return (time_decoder_ && time_decoder_->has_remaining()) ||
+ return (time_decoder_ && time_decoder_->has_remaining(time_in_)) ||
time_in_.has_remaining();
}
bool prev_value_page_not_finish() const {
- return (value_decoder_ && value_decoder_->has_remaining()) ||
+ return (value_decoder_ && value_decoder_->has_remaining(value_in_)) ||
value_in_.has_remaining();
}
diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc
index fad6cf39..d4ab50f1 100644
--- a/cpp/src/reader/chunk_reader.cc
+++ b/cpp/src/reader/chunk_reader.cc
@@ -367,9 +367,8 @@ int ChunkReader::decode_cur_page_data(TsBlock
*&ret_tsblock, Filter *filter,
do {
\
int64_t time = 0;
\
CppType value;
\
- while (time_decoder_->has_remaining() || time_in.has_remaining()) {
\
- ASSERT(value_decoder_->has_remaining() ||
\
- value_in.has_remaining());
\
+ while (time_decoder_->has_remaining(time_in)) {
\
+ ASSERT(value_decoder_->has_remaining(value_in));
\
if (UNLIKELY(!row_appender.add_row())) {
\
ret = E_OVERFLOW;
\
break;
\
@@ -396,8 +395,8 @@ int
ChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(ByteStream &time_in,
do {
int64_t time = 0;
int32_t value;
- while (time_decoder_->has_remaining() || time_in.has_remaining()) {
- ASSERT(value_decoder_->has_remaining() ||
value_in.has_remaining());
+ while (time_decoder_->has_remaining(time_in)) {
+ ASSERT(value_decoder_->has_remaining(value_in));
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
break;
@@ -425,8 +424,8 @@ int
ChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(ByteStream &time_in,
int ret = E_OK;
int64_t time = 0;
common::String value;
- while (time_decoder_->has_remaining() || time_in.has_remaining()) {
- ASSERT(value_decoder_->has_remaining() || value_in.has_remaining());
+ while (time_decoder_->has_remaining(time_in)) {
+ ASSERT(value_decoder_->has_remaining(value_in));
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
break;
diff --git a/cpp/src/reader/chunk_reader.h b/cpp/src/reader/chunk_reader.h
index edc968a9..23be4847 100644
--- a/cpp/src/reader/chunk_reader.h
+++ b/cpp/src/reader/chunk_reader.h
@@ -86,7 +86,7 @@ class ChunkReader : public IChunkReader {
int decode_cur_page_data(common::TsBlock *&ret_tsblock, Filter *filter,
common::PageArena &pa);
bool prev_page_not_finish() const {
- return (time_decoder_ && time_decoder_->has_remaining()) ||
+ return (time_decoder_ && time_decoder_->has_remaining(time_in_)) ||
time_in_.has_remaining();
}
diff --git a/cpp/test/common/allocator/byte_stream_test.cc
b/cpp/test/common/allocator/byte_stream_test.cc
index b7177d16..6296e3a5 100644
--- a/cpp/test/common/allocator/byte_stream_test.cc
+++ b/cpp/test/common/allocator/byte_stream_test.cc
@@ -283,4 +283,36 @@ TEST_F(SerializationUtilTest, WriteReadString) {
EXPECT_EQ(value_to_write, value_read);
}
+TEST_F(SerializationUtilTest, WriteReadIntLEPaddedBitWidth_BitWidthTooLarge) {
+ int32_t value = 123;
+ EXPECT_EQ(SerializationUtil::write_int_little_endian_padded_on_bit_width(
+ value, *byte_stream_, 40),
+ common::E_TSFILE_CORRUPTED);
+
+ byte_stream_->reset();
+ int32_t read_val = 0;
+ EXPECT_EQ(SerializationUtil::read_int_little_endian_padded_on_bit_width(
+ *byte_stream_, 40, read_val),
+ common::E_TSFILE_CORRUPTED);
+}
+
+TEST_F(SerializationUtilTest, WriteReadIntLEPaddedBitWidthBoundaryValue) {
+ std::vector<int32_t> test_values = {
+ 132100, 1, -1, 12345678, -87654321, INT32_MAX, INT32_MIN};
+ int bit_width = 32;
+ for (int32_t original_value : test_values) {
+ byte_stream_->reset();
+ EXPECT_EQ(
+ SerializationUtil::write_int_little_endian_padded_on_bit_width(
+ original_value, *byte_stream_, bit_width),
+ common::E_OK);
+ int32_t read_value = 0;
+
EXPECT_EQ(SerializationUtil::read_int_little_endian_padded_on_bit_width(
+ *byte_stream_, bit_width, read_value),
+ common::E_OK);
+ EXPECT_EQ(read_value, original_value)
+ << "Mismatch with bit_width = " << bit_width;
+ }
+}
+
} // namespace common
\ No newline at end of file
diff --git a/cpp/test/encoding/sprintz_codec_test.cc
b/cpp/test/encoding/sprintz_codec_test.cc
new file mode 100644
index 00000000..ec43ff40
--- /dev/null
+++ b/cpp/test/encoding/sprintz_codec_test.cc
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License a
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include <gtest/gtest.h>
+
+#include <cfloat>
+#include <climits>
+#include <cmath>
+
+#include "common/allocator/byte_stream.h"
+#include "encoding/double_sprintz_decoder.h"
+#include "encoding/double_sprintz_encoder.h"
+#include "encoding/float_sprintz_decoder.h"
+#include "encoding/float_sprintz_encoder.h"
+#include "encoding/int32_sprintz_decoder.h"
+#include "encoding/int32_sprintz_encoder.h"
+#include "encoding/int64_sprintz_decoder.h"
+#include "encoding/int64_sprintz_encoder.h"
+
+using namespace storage;
+using namespace common;
+
+namespace {
+
+constexpr int float_max_point_value = 10000;
+constexpr int64_t double_max_point_value = 1000000000000000LL;
+
+std::vector<int32_t> int_list;
+std::vector<int64_t> long_list;
+std::vector<float> float_list;
+std::vector<double> double_list;
+std::vector<int> iterations = {1, 3, 8, 16, 1000, 10000};
+
+void PrepareHybridData() {
+ int hybrid_count = 11;
+ int hybrid_num = 50;
+ int hybrid_start = 2000;
+ for (int i = 0; i < hybrid_num; i++) {
+ for (int j = 0; j < hybrid_count; j++) {
+ float_list.push_back(static_cast<float>(hybrid_start) /
+ float_max_point_value);
+ double_list.push_back(static_cast<double>(hybrid_start) /
+ double_max_point_value);
+ int_list.push_back(hybrid_start);
+ long_list.push_back(hybrid_start);
+ }
+ for (int j = 0; j < hybrid_count; j++) {
+ float_list.push_back(static_cast<float>(hybrid_start) /
+ float_max_point_value);
+ double_list.push_back(static_cast<double>(hybrid_start) /
+ double_max_point_value);
+ int_list.push_back(hybrid_start);
+ long_list.push_back(hybrid_start);
+ hybrid_start += 3;
+ }
+ hybrid_count += 2;
+ }
+}
+
+class SprintzCodecTest : public ::testing::Test {
+ protected:
+ void SetUp() override {
+ if (int_list.empty()) PrepareHybridData();
+ }
+};
+
+TEST_F(SprintzCodecTest, Int32SingleValue) {
+ Int32SprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ ASSERT_EQ(encoder.encode(777, stream), E_OK);
+ ASSERT_EQ(encoder.flush(stream), E_OK);
+
+ Int32SprintzDecoder decoder;
+ int32_t val;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_int32(val, stream), E_OK);
+ ASSERT_EQ(val, 777);
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, Int64SingleValue) {
+ Int64SprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ int64_t value = static_cast<int64_t>(INT32_MAX) + 10;
+ ASSERT_EQ(encoder.encode(value, stream), E_OK);
+ ASSERT_EQ(encoder.flush(stream), E_OK);
+
+ Int64SprintzDecoder decoder;
+ int64_t actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_int64(actual, stream), E_OK);
+ ASSERT_EQ(actual, value);
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, Int32EdgeValues) {
+ std::vector<int32_t> values = {INT32_MIN, -1, 0, 1, INT32_MAX};
+
+ Int32SprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ for (auto v : values) {
+ encoder.encode(v, stream);
+ }
+ encoder.flush(stream);
+
+ Int32SprintzDecoder decoder;
+ for (auto expected : values) {
+ int32_t actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_int32(actual, stream), E_OK);
+ ASSERT_EQ(actual, expected);
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, Int64EdgeValues) {
+ std::vector<int64_t> values = {INT64_MIN, -1, 0, 1, INT64_MAX};
+
+ Int64SprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ for (auto v : values) {
+ encoder.encode(v, stream);
+ }
+ encoder.flush(stream);
+
+ Int64SprintzDecoder decoder;
+ for (auto expected : values) {
+ int64_t actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_int64(actual, stream), E_OK);
+ ASSERT_EQ(actual, expected);
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, Int32ZeroNumber) {
+ Int32SprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ for (int i = 0; i < 3; ++i) encoder.encode(0, stream);
+ encoder.flush(stream);
+ for (int i = 0; i < 3; ++i) encoder.encode(0, stream);
+ encoder.flush(stream);
+
+ for (int round = 0; round < 2; ++round) {
+ Int32SprintzDecoder decoder;
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ int32_t actual;
+ ASSERT_EQ(decoder.read_int32(actual, stream), E_OK);
+ ASSERT_EQ(actual, 0);
+ }
+ }
+}
+
+TEST_F(SprintzCodecTest, Int64ZeroNumber) {
+ Int64SprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ for (int i = 0; i < 3; ++i) encoder.encode(static_cast<int64_t>(0),
stream);
+ encoder.flush(stream);
+ for (int i = 0; i < 3; ++i) encoder.encode(static_cast<int64_t>(0),
stream);
+ encoder.flush(stream);
+
+ for (int round = 0; round < 2; ++round) {
+ Int64SprintzDecoder decoder;
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ int64_t actual;
+ ASSERT_EQ(decoder.read_int64(actual, stream), E_OK);
+ ASSERT_EQ(actual, 0);
+ }
+ }
+}
+
+TEST_F(SprintzCodecTest, Int32Increasing) {
+ for (int num : iterations) {
+ Int32SprintzEncoder encoder;
+ ByteStream stream(1024, MOD_ENCODER_OBJ);
+ for (int i = 0; i < num; ++i) encoder.encode(7 + 2 * i, stream);
+ encoder.flush(stream);
+
+ Int32SprintzDecoder decoder;
+ for (int i = 0; i < num; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ int32_t actual;
+ ASSERT_EQ(decoder.read_int32(actual, stream), E_OK);
+ ASSERT_EQ(actual, 7 + 2 * i);
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+ }
+}
+
+TEST_F(SprintzCodecTest, Int64Increasing) {
+ for (int num : iterations) {
+ Int64SprintzEncoder encoder;
+ ByteStream stream(1024, MOD_ENCODER_OBJ);
+ for (int i = 0; i < num; ++i)
+ encoder.encode(static_cast<int64_t>(7) + 2 * i, stream);
+ encoder.flush(stream);
+
+ Int64SprintzDecoder decoder;
+ for (int i = 0; i < num; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ int64_t actual;
+ ASSERT_EQ(decoder.read_int64(actual, stream), E_OK);
+ ASSERT_EQ(actual, 7 + 2 * i);
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+ }
+}
+
+TEST_F(SprintzCodecTest, FloatSingleValue) {
+ FloatSprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ ASSERT_EQ(encoder.encode(FLT_MAX, stream), E_OK);
+ ASSERT_EQ(encoder.flush(stream), E_OK);
+
+ FloatSprintzDecoder decoder;
+ float actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_float(actual, stream), E_OK);
+ ASSERT_EQ(actual, FLT_MAX);
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, DoubleSingleValue) {
+ DoubleSprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ ASSERT_EQ(encoder.encode(DBL_MAX, stream), E_OK);
+ ASSERT_EQ(encoder.flush(stream), E_OK);
+
+ DoubleSprintzDecoder decoder;
+ double actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_double(actual, stream), E_OK);
+ ASSERT_EQ(actual, DBL_MAX);
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, FloatZeroNumber) {
+ FloatSprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ float value = 0.0f;
+ for (int i = 0; i < 3; ++i) encoder.encode(value, stream);
+ encoder.flush(stream);
+ for (int i = 0; i < 3; ++i) encoder.encode(value, stream);
+ encoder.flush(stream);
+
+ for (int round = 0; round < 2; ++round) {
+ FloatSprintzDecoder decoder;
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ float actual;
+ ASSERT_EQ(decoder.read_float(actual, stream), E_OK);
+ ASSERT_EQ(actual, value);
+ }
+ }
+}
+
+TEST_F(SprintzCodecTest, DoubleZeroNumber) {
+ DoubleSprintzEncoder encoder;
+ ByteStream stream(128, MOD_ENCODER_OBJ);
+ double value = 0.0;
+ for (int i = 0; i < 3; ++i) encoder.encode(value, stream);
+ encoder.flush(stream);
+ for (int i = 0; i < 3; ++i) encoder.encode(value, stream);
+ encoder.flush(stream);
+
+ for (int round = 0; round < 2; ++round) {
+ DoubleSprintzDecoder decoder;
+ for (int i = 0; i < 3; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ double actual;
+ ASSERT_EQ(decoder.read_double(actual, stream), E_OK);
+ ASSERT_EQ(actual, value);
+ }
+ }
+}
+
+TEST_F(SprintzCodecTest, FloatIncreasing) {
+ for (int num : iterations) {
+ FloatSprintzEncoder encoder;
+ ByteStream stream(1024, MOD_ENCODER_OBJ);
+ float value = 7.101f;
+ for (int i = 0; i < num; ++i) encoder.encode(value + 2.0f * i, stream);
+ encoder.flush(stream);
+
+ FloatSprintzDecoder decoder;
+ for (int i = 0; i < num; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ float actual;
+ ASSERT_EQ(decoder.read_float(actual, stream), E_OK);
+ ASSERT_FLOAT_EQ(actual, value + 2 * i);
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+ }
+}
+
+TEST_F(SprintzCodecTest, DoubleIncreasing) {
+ for (int num : iterations) {
+ DoubleSprintzEncoder encoder;
+ ByteStream stream(1024, MOD_ENCODER_OBJ);
+ float f = 7.101f;
+ double value = static_cast<double>(f);
+ for (int i = 0; i < num; ++i) {
+ double input_val = value + 2.0 * i;
+ encoder.encode(input_val, stream);
+ }
+
+ encoder.flush(stream);
+
+ DoubleSprintzDecoder decoder;
+ for (int i = 0; i < num; ++i) {
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ double actual;
+ ASSERT_EQ(decoder.read_double(actual, stream), E_OK);
+ ASSERT_DOUBLE_EQ(actual, value + 2 * i);
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+ }
+}
+
+TEST_F(SprintzCodecTest, FloatExtremeValues) {
+ std::vector<float> test_vals = {FLT_MIN, FLT_MAX, -FLT_MIN, -FLT_MAX,
+ -0.0f, 0.0f, std::nanf("1")};
+
+ FloatSprintzEncoder encoder;
+ ByteStream stream(256, MOD_ENCODER_OBJ);
+ for (auto v : test_vals) {
+ encoder.encode(v, stream);
+ }
+ encoder.flush(stream);
+
+ FloatSprintzDecoder decoder;
+ for (auto expected : test_vals) {
+ float actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_float(actual, stream), E_OK);
+ if (std::isnan(expected)) {
+ ASSERT_TRUE(std::isnan(actual));
+ } else {
+ ASSERT_FLOAT_EQ(actual, expected);
+ }
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+TEST_F(SprintzCodecTest, DoubleExtremeValues) {
+ std::vector<double> test_vals = {DBL_MIN, DBL_MAX, -DBL_MIN, -DBL_MAX,
+ -0.0, 0.0, std::nan("")};
+
+ DoubleSprintzEncoder encoder;
+ ByteStream stream(256, MOD_ENCODER_OBJ);
+ for (auto v : test_vals) {
+ encoder.encode(v, stream);
+ }
+ encoder.flush(stream);
+
+ DoubleSprintzDecoder decoder;
+ for (auto expected : test_vals) {
+ double actual;
+ ASSERT_TRUE(decoder.has_remaining(stream));
+ ASSERT_EQ(decoder.read_double(actual, stream), E_OK);
+ if (std::isnan(expected)) {
+ ASSERT_TRUE(std::isnan(actual));
+ } else {
+ ASSERT_DOUBLE_EQ(actual, expected);
+ }
+ }
+ ASSERT_FALSE(decoder.has_remaining(stream));
+}
+
+} // namespace
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 43b00dbe..2bc9fd9a 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -885,12 +885,15 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
std::vector<std::string> measurement_names = {
"int32_zigzag", "int64_zigzag", "string_dic", "text_dic",
"float_gorilla", "double_gorilla", "int32_ts2diff", "int64_ts2diff",
- "int32_rle", "int64_rle"};
+ "int32_rle", "int64_rle", "int32_sprintz", "int64_sprintz",
+ "float_sprintz", "double_sprintz",
+ };
std::vector<common::TSDataType> data_types = {
- INT32, INT64, STRING, TEXT, FLOAT, DOUBLE, INT32, INT64, INT32, INT64};
+ INT32, INT64, STRING, TEXT, FLOAT, DOUBLE, INT32,
+ INT64, INT32, INT64, INT32, INT64, FLOAT, DOUBLE};
std::vector<common::TSEncoding> encodings = {
- ZIGZAG, ZIGZAG, DICTIONARY, DICTIONARY, GORILLA,
- GORILLA, TS_2DIFF, TS_2DIFF, RLE, RLE};
+ ZIGZAG, ZIGZAG, DICTIONARY, DICTIONARY, GORILLA, GORILLA, TS_2DIFF,
+ TS_2DIFF, RLE, RLE, SPRINTZ, SPRINTZ, SPRINTZ, SPRINTZ};
for (int i = 0; i < measurement_names.size(); i++) {
measurement_schemas.emplace_back(new MeasurementSchema(
@@ -974,6 +977,11 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
ASSERT_EQ(table_result_set->get_value<int32_t>(10), 32);
ASSERT_EQ(table_result_set->get_value<int64_t>(11), 64);
+ // SPRINTZ
+ ASSERT_EQ(table_result_set->get_value<int32_t>(12), 32);
+ ASSERT_EQ(table_result_set->get_value<int64_t>(13), 64);
+ ASSERT_FLOAT_EQ(table_result_set->get_value<float>(14), (float)1.0);
+ ASSERT_DOUBLE_EQ(table_result_set->get_value<double>(15), (double)2.0);
}
reader.destroy_query_data_set(table_result_set);
ASSERT_EQ(reader.close(), common::E_OK);