This is an automated email from the ASF dual-hosted git repository.
hongzhigao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 434f0cb7 Fix bugs and optimize TS2diff encoding. (#428)
434f0cb7 is described below
commit 434f0cb73473fa35b1004699f8deedcffb571a42
Author: Hongzhi Gao <[email protected]>
AuthorDate: Tue Mar 4 18:21:36 2025 +0800
Fix bugs and optimize TS2diff encoding. (#428)
* ts2diff SIMD
* fix snappy_compressor.cc
* fix page_writer_max_point_num_
* Fix the issue of reading large pages.
* Fix the issue of reading large pages.
* Fix some issues
* Fix snappy_compressor.cc
---
cpp/src/common/global.cc | 2 +-
cpp/src/encoding/ts2diff_encoder.h | 66 ++++++++++++++++++++++++++++++---
cpp/src/reader/chunk_reader.cc | 6 +--
cpp/src/writer/tsfile_writer.h | 5 +--
cpp/test/encoding/ts2diff_codec_test.cc | 59 +++++++++++++++++++++++++++++
cpp/test/writer/tsfile_writer_test.cc | 2 +-
cpp/third_party/google_snappy/snappy.cc | 4 ++
7 files changed, 129 insertions(+), 15 deletions(-)
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 3fbe1872..7196633b 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -35,7 +35,7 @@ void init_config_value() {
g_config_value_.tsblock_mem_inc_step_size_ = 8000; // 8k
g_config_value_.tsblock_max_memory_ = 64000; // 64k
// g_config_value_.tsblock_max_memory_ = 32;
- g_config_value_.page_writer_max_point_num_ = 5;
+ g_config_value_.page_writer_max_point_num_ = 10000;
g_config_value_.page_writer_max_memory_bytes_ = 128 * 1024; // 128 k
g_config_value_.max_degree_of_index_node_ = 256;
g_config_value_.tsfile_index_bloom_filter_error_percent_ = 0.05;
diff --git a/cpp/src/encoding/ts2diff_encoder.h
b/cpp/src/encoding/ts2diff_encoder.h
index a1c5a534..d2da14ca 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -25,9 +25,67 @@
#include "common/allocator/alloc_base.h"
#include "common/allocator/byte_stream.h"
#include "encoder.h"
+#if defined(__SSE4_2__)
+#include <smmintrin.h>
+#define USE_SSE 1
+#elif defined(__AVX2__)
+#include <immintrin.h>
+#define USE_AVX2 1
+#endif
namespace storage {
+template <typename T>
+struct SIMDOps;
+
+template <>
+struct SIMDOps<int32_t> {
+#ifdef USE_SSE
+ static void rebase(int32_t* arr, int32_t min_val, size_t size) {
+ const __m128i min_vec = _mm_set1_epi32(min_val);
+ size_t i = 0;
+ for (; i + 3 < size; i += 4) {
+ __m128i vec = _mm_loadu_si128(reinterpret_cast<const __m128i*>(arr
+ i));
+ vec = _mm_sub_epi32(vec, min_vec);
+ _mm_storeu_si128(reinterpret_cast<__m128i*>(arr + i), vec);
+ }
+ for (; i < size; ++i) {
+ arr[i] -= min_val;
+ }
+ }
+#else
+ static void rebase(int32_t* arr, int32_t min_val, size_t size) {
+ for (size_t i = 0; i < size; ++i) {
+ arr[i] -= min_val;
+ }
+ }
+#endif
+};
+
+template <>
+struct SIMDOps<int64_t> {
+#ifdef USE_AVX2
+ static void rebase(int64_t* arr, int64_t min_val, size_t size) {
+ const __m256i min_vec = _mm256_set1_epi64x(min_val);
+ size_t i = 0;
+ for (; i + 3 < size; i += 4) {
+ __m256i vec = _mm256_loadu_si256(reinterpret_cast<const
__m256i*>(arr + i));
+ vec = _mm256_sub_epi64(vec, min_vec);
+ _mm256_storeu_si256(reinterpret_cast<__m256i*>(arr + i), vec);
+ }
+ for (; i < size; ++i) {
+ arr[i] -= min_val;
+ }
+ }
+#else
+ static void rebase(int64_t* arr, int64_t min_val, size_t size) {
+ for (size_t i = 0; i < size; ++i) {
+ arr[i] -= min_val;
+ }
+ }
+#endif
+};
+
template <typename T>
class TS2DIFFEncoder : public Encoder {
public:
@@ -168,9 +226,7 @@ inline int
TS2DIFFEncoder<int32_t>::flush(common::ByteStream &out_stream) {
return common::E_OK;
}
// Subtract the minimum value for each delta_arr_ item
- for (int i = 0; i < write_index_; i++) {
- rebase_arr(i);
- }
+ SIMDOps<int32_t>::rebase(delta_arr_, delta_arr_min_, write_index_);
// Calculate the bit length of each value to writer
int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
// writer header
@@ -194,9 +250,7 @@ inline int
TS2DIFFEncoder<int64_t>::flush(common::ByteStream &out_stream) {
return common::E_OK;
}
// Subtract the minimum value for each delta_arr_ item
- for (int i = 0; i < write_index_; i++) {
- rebase_arr(i);
- }
+ SIMDOps<int64_t>::rebase(delta_arr_, delta_arr_min_, write_index_);
// Calculate the bit length of each value to writer
int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
// writer header
diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc
index f130ae6d..f1e395da 100644
--- a/cpp/src/reader/chunk_reader.cc
+++ b/cpp/src/reader/chunk_reader.cc
@@ -240,7 +240,7 @@ int ChunkReader::read_from_file_and_rewrap(int want_size) {
file_data_buf_size_ = read_size;
}
int ret_read_len = 0;
- if (RET_FAIL(read_file_->read(offset, file_data_buf, DEFAULT_READ_SIZE,
+ if (RET_FAIL(read_file_->read(offset, file_data_buf, read_size,
ret_read_len))) {
} else {
in_stream_.wrap_from(file_data_buf, ret_read_len);
@@ -436,8 +436,6 @@ int
ChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(ByteStream &time_in,
row_appender.backoff_add_row();
continue;
} else {
- /*std::cout << "decoder: time=" << time << ", value=" << value
- * << std::endl;*/
row_appender.append(0, (char *)&time, sizeof(time));
row_appender.append(1, (char *)&value, sizeof(value));
}
@@ -476,7 +474,7 @@ int
ChunkReader::decode_tv_buf_into_tsblock_by_datatype(ByteStream &time_in,
row_appender);
break;
case common::STRING:
- STRING_DECODE_TYPED_TV_INTO_TSBLOCK(time_in, value_in,
row_appender,
+ ret = STRING_DECODE_TYPED_TV_INTO_TSBLOCK(time_in, value_in,
row_appender,
*pa, filter);
break;
default:
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 3b155dba..bfaf70f0 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -59,6 +59,8 @@ class TsFileWriter {
void set_generate_table_schema(bool generate_table_schema);
int register_timeseries(const std::string &device_id,
const MeasurementSchema &measurement_schema);
+ int register_timeseries(const std::string &device_path,
+ const std::vector<MeasurementSchema *> &measurement_schema_vec);
int register_aligned_timeseries(
const std::string &device_id,
const MeasurementSchema &measurement_schema);
@@ -161,9 +163,6 @@ class TsFileWriter {
int register_timeseries(const std::string &device_path,
MeasurementSchema *measurement_schema,
bool is_aligned = false);
- int register_timeseries(
- const std::string &device_path,
- const std::vector<MeasurementSchema *> &measurement_schema_vec);
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>>
split_tablet_by_device(const Tablet &tablet);
diff --git a/cpp/test/encoding/ts2diff_codec_test.cc
b/cpp/test/encoding/ts2diff_codec_test.cc
index 15836308..86988088 100644
--- a/cpp/test/encoding/ts2diff_codec_test.cc
+++ b/cpp/test/encoding/ts2diff_codec_test.cc
@@ -19,6 +19,7 @@
#include <gtest/gtest.h>
#include <bitset>
+#include <random>
#include "encoding/ts2diff_decoder.h"
#include "encoding/ts2diff_encoder.h"
@@ -142,4 +143,62 @@ TEST_F(TS2DIFFCodecTest, TestLongEncoding2) {
}
}
+TEST_F(TS2DIFFCodecTest, TestRandomEncoding) {
+ common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+ const int row_num = 10000;
+ int64_t data[row_num];
+ memset(data, 0, sizeof(int64_t) * row_num);
+
+ std::mt19937 rng(std::random_device{}());
+ int min = -100000;
+ int max = 100000;
+ std::uniform_int_distribution<int> dist(min, max);
+ for (int i = 0; i < row_num; i++) {
+ int random_number = dist(rng);
+ data[i] = random_number;
+ }
+
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(encoder_long_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(encoder_long_->flush(out_stream), common::E_OK);
+
+ int64_t x;
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(decoder_long_->read_int64(x, out_stream), common::E_OK);
+ EXPECT_EQ(x, data[i]);
+ }
+}
+
+TEST_F(TS2DIFFCodecTest, LargeDataTest) {
+ common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+ std::mt19937 gen(42);
+ std::uniform_int_distribution<int32_t> dist(-100000, 100000);
+ const int row_num = 2000000;
+ std::vector<int32_t> data(row_num);
+ for (int i = 0; i < row_num; i++) {
+ data[i] = dist(gen);
+ }
+
+ auto start_encode = std::chrono::steady_clock::now();
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(encoder_int_->encode(data[i], out_stream), common::E_OK);
+ }
+ EXPECT_EQ(encoder_int_->flush(out_stream), common::E_OK);
+ auto end_encode = std::chrono::steady_clock::now();
+
+ std::vector<int32_t> decoded(row_num);
+ auto start_decode = std::chrono::steady_clock::now();
+ for (int i = 0; i < row_num; i++) {
+ EXPECT_EQ(decoder_int_->read_int32(decoded[i], out_stream),
common::E_OK);
+ }
+ auto end_decode = std::chrono::steady_clock::now();
+
+ auto encode_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end_encode -
start_encode);
+ auto decode_duration =
std::chrono::duration_cast<std::chrono::milliseconds>(end_decode -
start_decode);
+
+ std::cout << "Encode time: " << encode_duration.count() << "ms\n";
+ std::cout << "Decode time: " << decode_duration.count() << "ms\n";
+}
+
} // namespace storage
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index ce082c2b..3f99971f 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -137,7 +137,7 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
std::strcpy(literal, "device_id");
String literal_str(literal, std::strlen("device_id"));
- int row_num = 1000;
+ int row_num = 100000;
for (int i = 0; i < row_num; ++i) {
TsRecord record(1622505600000 + i * 100, device_name);
for (uint32_t j = 0; j < measurement_names.size(); j++) {
diff --git a/cpp/third_party/google_snappy/snappy.cc
b/cpp/third_party/google_snappy/snappy.cc
index 877b65a7..80e5642e 100644
--- a/cpp/third_party/google_snappy/snappy.cc
+++ b/cpp/third_party/google_snappy/snappy.cc
@@ -68,6 +68,10 @@
#include <arm_acle.h>
#endif
+#if defined(__SSE4_2__)
+#include <immintrin.h>
+#endif
+
#include <algorithm>
#include <array>
#include <cstddef>