This is an automated email from the ASF dual-hosted git repository.

hongzhigao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 434f0cb7 Fix bugs and optimize TS2diff encoding. (#428)
434f0cb7 is described below

commit 434f0cb73473fa35b1004699f8deedcffb571a42
Author: Hongzhi Gao <[email protected]>
AuthorDate: Tue Mar 4 18:21:36 2025 +0800

    Fix bugs and optimize TS2diff encoding. (#428)
    
    * ts2diff SIMD
    
    * fix snappy_compressor.cc
    
    * fix page_writer_max_point_num_
    
    * Fix the issue of reading large pages.
    
    * Fix the issue of reading large pages.
    
    * Fix some issues
    
    * Fix snappy_compressor.cc
---
 cpp/src/common/global.cc                |  2 +-
 cpp/src/encoding/ts2diff_encoder.h      | 66 ++++++++++++++++++++++++++++++---
 cpp/src/reader/chunk_reader.cc          |  6 +--
 cpp/src/writer/tsfile_writer.h          |  5 +--
 cpp/test/encoding/ts2diff_codec_test.cc | 59 +++++++++++++++++++++++++++++
 cpp/test/writer/tsfile_writer_test.cc   |  2 +-
 cpp/third_party/google_snappy/snappy.cc |  4 ++
 7 files changed, 129 insertions(+), 15 deletions(-)

diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 3fbe1872..7196633b 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -35,7 +35,7 @@ void init_config_value() {
     g_config_value_.tsblock_mem_inc_step_size_ = 8000;  // 8k
     g_config_value_.tsblock_max_memory_ = 64000;        // 64k
     // g_config_value_.tsblock_max_memory_ = 32;
-    g_config_value_.page_writer_max_point_num_ = 5;
+    g_config_value_.page_writer_max_point_num_ = 10000;
     g_config_value_.page_writer_max_memory_bytes_ = 128 * 1024;  // 128 k
     g_config_value_.max_degree_of_index_node_ = 256;
     g_config_value_.tsfile_index_bloom_filter_error_percent_ = 0.05;
diff --git a/cpp/src/encoding/ts2diff_encoder.h 
b/cpp/src/encoding/ts2diff_encoder.h
index a1c5a534..d2da14ca 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -25,9 +25,67 @@
 #include "common/allocator/alloc_base.h"
 #include "common/allocator/byte_stream.h"
 #include "encoder.h"
+#if defined(__SSE4_2__)
+#include <smmintrin.h>
+#define USE_SSE 1
+#elif defined(__AVX2__)
+#include <immintrin.h>
+#define USE_AVX2 1
+#endif
 
 namespace storage {
 
+template <typename T>
+struct SIMDOps;
+
+template <>
+struct SIMDOps<int32_t> {
+#ifdef USE_SSE
+    static void rebase(int32_t* arr, int32_t min_val, size_t size) {
+        const __m128i min_vec = _mm_set1_epi32(min_val);
+        size_t i = 0;
+        for (; i + 3 < size; i += 4) {
+            __m128i vec = _mm_loadu_si128(reinterpret_cast<const __m128i*>(arr 
+ i));
+            vec = _mm_sub_epi32(vec, min_vec);
+            _mm_storeu_si128(reinterpret_cast<__m128i*>(arr + i), vec);
+        }
+        for (; i < size; ++i) {
+            arr[i] -= min_val;
+        }
+    }
+#else
+    static void rebase(int32_t* arr, int32_t min_val, size_t size) {
+        for (size_t i = 0; i < size; ++i) {
+            arr[i] -= min_val;
+        }
+    }
+#endif
+};
+
+template <>
+struct SIMDOps<int64_t> {
+#ifdef USE_AVX2
+    static void rebase(int64_t* arr, int64_t min_val, size_t size) {
+        const __m256i min_vec = _mm256_set1_epi64x(min_val);
+        size_t i = 0;
+        for (; i + 3 < size; i += 4) {
+            __m256i vec = _mm256_loadu_si256(reinterpret_cast<const 
__m256i*>(arr + i));
+            vec = _mm256_sub_epi64(vec, min_vec);
+            _mm256_storeu_si256(reinterpret_cast<__m256i*>(arr + i), vec);
+        }
+        for (; i < size; ++i) {
+            arr[i] -= min_val;
+        }
+    }
+#else
+    static void rebase(int64_t* arr, int64_t min_val, size_t size) {
+        for (size_t i = 0; i < size; ++i) {
+            arr[i] -= min_val;
+        }
+    }
+#endif
+};
+
 template <typename T>
 class TS2DIFFEncoder : public Encoder {
    public:
@@ -168,9 +226,7 @@ inline int 
TS2DIFFEncoder<int32_t>::flush(common::ByteStream &out_stream) {
         return common::E_OK;
     }
     // Subtract the minimum value for each delta_arr_ item
-    for (int i = 0; i < write_index_; i++) {
-        rebase_arr(i);
-    }
+    SIMDOps<int32_t>::rebase(delta_arr_, delta_arr_min_, write_index_);
     // Calculate the bit length of each value to writer
     int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
     // writer header
@@ -194,9 +250,7 @@ inline int 
TS2DIFFEncoder<int64_t>::flush(common::ByteStream &out_stream) {
         return common::E_OK;
     }
     // Subtract the minimum value for each delta_arr_ item
-    for (int i = 0; i < write_index_; i++) {
-        rebase_arr(i);
-    }
+    SIMDOps<int64_t>::rebase(delta_arr_, delta_arr_min_, write_index_);
     // Calculate the bit length of each value to writer
     int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
     // writer header
diff --git a/cpp/src/reader/chunk_reader.cc b/cpp/src/reader/chunk_reader.cc
index f130ae6d..f1e395da 100644
--- a/cpp/src/reader/chunk_reader.cc
+++ b/cpp/src/reader/chunk_reader.cc
@@ -240,7 +240,7 @@ int ChunkReader::read_from_file_and_rewrap(int want_size) {
         file_data_buf_size_ = read_size;
     }
     int ret_read_len = 0;
-    if (RET_FAIL(read_file_->read(offset, file_data_buf, DEFAULT_READ_SIZE,
+    if (RET_FAIL(read_file_->read(offset, file_data_buf, read_size,
                                   ret_read_len))) {
     } else {
         in_stream_.wrap_from(file_data_buf, ret_read_len);
@@ -436,8 +436,6 @@ int 
ChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(ByteStream &time_in,
             row_appender.backoff_add_row();
             continue;
         } else {
-            /*std::cout << "decoder: time=" << time << ", value=" << value
-             * << std::endl;*/
             row_appender.append(0, (char *)&time, sizeof(time));
             row_appender.append(1, (char *)&value, sizeof(value));
         }
@@ -476,7 +474,7 @@ int 
ChunkReader::decode_tv_buf_into_tsblock_by_datatype(ByteStream &time_in,
                                          row_appender);
             break;
         case common::STRING:
-            STRING_DECODE_TYPED_TV_INTO_TSBLOCK(time_in, value_in, 
row_appender,
+            ret = STRING_DECODE_TYPED_TV_INTO_TSBLOCK(time_in, value_in, 
row_appender,
                                                 *pa, filter);
             break;
         default:
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 3b155dba..bfaf70f0 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -59,6 +59,8 @@ class TsFileWriter {
     void set_generate_table_schema(bool generate_table_schema);
     int register_timeseries(const std::string &device_id,
                             const MeasurementSchema &measurement_schema);
+    int register_timeseries(const std::string &device_path,
+        const std::vector<MeasurementSchema *> &measurement_schema_vec);
     int register_aligned_timeseries(
         const std::string &device_id,
         const MeasurementSchema &measurement_schema);
@@ -161,9 +163,6 @@ class TsFileWriter {
     int register_timeseries(const std::string &device_path,
                             MeasurementSchema *measurement_schema,
                             bool is_aligned = false);
-    int register_timeseries(
-        const std::string &device_path,
-        const std::vector<MeasurementSchema *> &measurement_schema_vec);
     std::vector<std::pair<std::shared_ptr<IDeviceID>, int>>
     split_tablet_by_device(const Tablet &tablet);
 
diff --git a/cpp/test/encoding/ts2diff_codec_test.cc 
b/cpp/test/encoding/ts2diff_codec_test.cc
index 15836308..86988088 100644
--- a/cpp/test/encoding/ts2diff_codec_test.cc
+++ b/cpp/test/encoding/ts2diff_codec_test.cc
@@ -19,6 +19,7 @@
 #include <gtest/gtest.h>
 
 #include <bitset>
+#include <random>
 
 #include "encoding/ts2diff_decoder.h"
 #include "encoding/ts2diff_encoder.h"
@@ -142,4 +143,62 @@ TEST_F(TS2DIFFCodecTest, TestLongEncoding2) {
     }
 }
 
+TEST_F(TS2DIFFCodecTest, TestRandomEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+    const int row_num = 10000;
+    int64_t data[row_num];
+    memset(data, 0, sizeof(int64_t) * row_num);
+
+    std::mt19937 rng(std::random_device{}());
+    int min = -100000;
+    int max = 100000;
+    std::uniform_int_distribution<int> dist(min, max);
+    for (int i = 0; i < row_num; i++) {
+        int random_number = dist(rng);
+        data[i] = random_number;
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(encoder_long_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(encoder_long_->flush(out_stream), common::E_OK);
+
+    int64_t x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(decoder_long_->read_int64(x, out_stream), common::E_OK);
+        EXPECT_EQ(x, data[i]);
+    }
+}
+
+TEST_F(TS2DIFFCodecTest, LargeDataTest) {
+    common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+    std::mt19937 gen(42);
+    std::uniform_int_distribution<int32_t> dist(-100000, 100000);
+    const int row_num = 2000000;
+    std::vector<int32_t> data(row_num);
+    for (int i = 0; i < row_num; i++) {
+        data[i] = dist(gen);
+    }
+
+    auto start_encode = std::chrono::steady_clock::now();
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(encoder_int_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(encoder_int_->flush(out_stream), common::E_OK);
+    auto end_encode = std::chrono::steady_clock::now();
+
+    std::vector<int32_t> decoded(row_num);
+    auto start_decode = std::chrono::steady_clock::now();
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(decoder_int_->read_int32(decoded[i], out_stream), 
common::E_OK);
+    }
+    auto end_decode = std::chrono::steady_clock::now();
+
+    auto encode_duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(end_encode - 
start_encode);
+    auto decode_duration = 
std::chrono::duration_cast<std::chrono::milliseconds>(end_decode - 
start_decode);
+
+    std::cout << "Encode time: " << encode_duration.count() << "ms\n";
+    std::cout << "Decode time: " << decode_duration.count() << "ms\n";
+}
+
 }  // namespace storage
diff --git a/cpp/test/writer/tsfile_writer_test.cc 
b/cpp/test/writer/tsfile_writer_test.cc
index ce082c2b..3f99971f 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -137,7 +137,7 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) {
     std::strcpy(literal, "device_id");
     String literal_str(literal, std::strlen("device_id"));
 
-    int row_num = 1000;
+    int row_num = 100000;
     for (int i = 0; i < row_num; ++i) {
         TsRecord record(1622505600000 + i * 100, device_name);
         for (uint32_t j = 0; j < measurement_names.size(); j++) {
diff --git a/cpp/third_party/google_snappy/snappy.cc 
b/cpp/third_party/google_snappy/snappy.cc
index 877b65a7..80e5642e 100644
--- a/cpp/third_party/google_snappy/snappy.cc
+++ b/cpp/third_party/google_snappy/snappy.cc
@@ -68,6 +68,10 @@
 #include <arm_acle.h>
 #endif
 
+#if defined(__SSE4_2__)
+#include <immintrin.h>
+#endif
+
 #include <algorithm>
 #include <array>
 #include <cstddef>

Reply via email to