This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git


The following commit(s) were added to refs/heads/develop by this push:
     new 2c2e9117 Fix large page read and bitmap index (#427)
2c2e9117 is described below

commit 2c2e91176f316c19cee962a50926a6e95ae97dce
Author: Jiang Tian <[email protected]>
AuthorDate: Tue Mar 4 14:18:41 2025 +0800

    Fix large page read and bitmap index (#427)
    
    * Fix large page read and bitmap index
    
    * revert
     index
    
    * fix ts_2diff decode
    
    * revert cur_value_index when fail to add row
    
    * revert test
---
 cpp/src/cwrapper/tsfile_cwrapper.cc                |  2 +-
 cpp/src/encoding/ts2diff_decoder.h                 |  8 ++--
 cpp/src/encoding/ts2diff_encoder.h                 |  8 ++--
 cpp/src/reader/aligned_chunk_reader.cc             | 24 +++++++-----
 cpp/src/reader/aligned_chunk_reader.h              |  2 +-
 .../reader/block/single_device_tsblock_reader.cc   |  1 -
 cpp/src/reader/qds_without_timegenerator.cc        |  3 +-
 cpp/src/reader/table_query_executor.h              |  2 +-
 cpp/src/writer/tsfile_writer.cc                    |  2 +-
 cpp/test/encoding/ts2diff_codec_test.cc            | 44 +++++++++++++++++++++-
 .../reader/table_view/tsfile_reader_table_test.cc  | 15 +++++++-
 11 files changed, 85 insertions(+), 26 deletions(-)

diff --git a/cpp/src/cwrapper/tsfile_cwrapper.cc 
b/cpp/src/cwrapper/tsfile_cwrapper.cc
index 7e43ac80..e18c843a 100644
--- a/cpp/src/cwrapper/tsfile_cwrapper.cc
+++ b/cpp/src/cwrapper/tsfile_cwrapper.cc
@@ -37,7 +37,7 @@ static bool is_init = false;
 
 void init_tsfile_config() {
     if (!is_init) {
-        common::init_config_value();
+        common::init_common();
         is_init = true;
     }
 }
diff --git a/cpp/src/encoding/ts2diff_decoder.h 
b/cpp/src/encoding/ts2diff_decoder.h
index 76d77dc0..aa740fa7 100644
--- a/cpp/src/encoding/ts2diff_decoder.h
+++ b/cpp/src/encoding/ts2diff_decoder.h
@@ -74,6 +74,7 @@ class TS2DIFFDecoder : public Decoder {
     int64_t read_long(int bits, common::ByteStream &in) {
         int64_t value = 0;
         while (bits > 0) {
+            read_byte_if_empty(in);
             if (bits > bits_left_ || bits == 8) {
                 // Take only the bits_left_ "least significant" bits.
                 uint8_t d = (uint8_t)(buffer_ & ((1 << bits_left_) - 1));
@@ -93,7 +94,6 @@ class TS2DIFFDecoder : public Decoder {
             if (bits <= 0 && current_index_ == 0) {
                 break;
             }
-            read_byte_if_empty(in);
         }
         return value;
     }
@@ -129,14 +129,15 @@ inline int32_t 
TS2DIFFDecoder<int32_t>::decode(common::ByteStream &in) {
         ret_value = first_value_;
         bits_left_ = 0;
         buffer_ = 0;
-        read_byte_if_empty(in);
         current_index_ = 1;
         return ret_value;
     }
     if (current_index_++ >= write_index_) {
         current_index_ = 0;
     }
-    stored_value_ = (int32_t)read_long(bit_width_, in);
+    // although it seems we are reading an int64, bit_width_ guarantees
+    // that it does not overflow int32
+    stored_value_ = read_long(bit_width_, in);
     ret_value = stored_value_ + first_value_ + delta_min_;
     first_value_ = ret_value;
 
@@ -151,7 +152,6 @@ inline int64_t 
TS2DIFFDecoder<int64_t>::decode(common::ByteStream &in) {
         common::SerializationUtil::read_i64(delta_min_, in);
         common::SerializationUtil::read_i64(first_value_, in);
         ret_value = first_value_;
-        read_byte_if_empty(in);
         current_index_ = 1;
         return ret_value;
     }
diff --git a/cpp/src/encoding/ts2diff_encoder.h 
b/cpp/src/encoding/ts2diff_encoder.h
index 5e456bc7..a1c5a534 100644
--- a/cpp/src/encoding/ts2diff_encoder.h
+++ b/cpp/src/encoding/ts2diff_encoder.h
@@ -200,10 +200,10 @@ inline int 
TS2DIFFEncoder<int64_t>::flush(common::ByteStream &out_stream) {
     // Calculate the bit length of each value to writer
     int bit_width = cal_bit_width(delta_arr_max_ - delta_arr_min_);
     // writer header
-    common::SerializationUtil::write_ui32(write_index_, out_stream);
-    common::SerializationUtil::write_ui32(bit_width, out_stream);
-    common::SerializationUtil::write_ui64(delta_arr_min_, out_stream);
-    common::SerializationUtil::write_ui64(first_value_, out_stream);
+    common::SerializationUtil::write_i32(write_index_, out_stream);
+    common::SerializationUtil::write_i32(bit_width, out_stream);
+    common::SerializationUtil::write_i64(delta_arr_min_, out_stream);
+    common::SerializationUtil::write_i64(first_value_, out_stream);
     // writer data
     for (int i = 0; i < write_index_; i++) {
         write_bits(delta_arr_[i], bit_width, out_stream);
diff --git a/cpp/src/reader/aligned_chunk_reader.cc 
b/cpp/src/reader/aligned_chunk_reader.cc
index a6ce3e7b..6b89e801 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -289,7 +289,7 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta 
*&chunk_meta,
 // @in_stream_
 int AlignedChunkReader::read_from_file_and_rewrap(
     common::ByteStream &in_stream_, ChunkMeta *&chunk_meta,
-    uint32_t &chunk_visit_offset, int32_t file_data_buf_size, int want_size) {
+    uint32_t &chunk_visit_offset, int32_t &file_data_buf_size, int want_size) {
     int ret = E_OK;
     const int DEFAULT_READ_SIZE = 4096;  // may use page_size + 
page_header_size
     char *file_data_buf = in_stream_.get_wrapped_buf();
@@ -350,8 +350,8 @@ int AlignedChunkReader::decode_cur_time_page_data() {
         // << cur_page_header_.compressed_size_ << std::endl;
         if (RET_FAIL(read_from_file_and_rewrap(
                 time_in_stream_, time_chunk_meta_, time_chunk_visit_offset_,
-                cur_time_page_header_.compressed_size_,
-                file_data_time_buf_size_))) {
+                file_data_time_buf_size_,
+                cur_value_page_header_.compressed_size_))) {
         }
     }
 
@@ -429,8 +429,8 @@ int AlignedChunkReader::decode_cur_value_page_data() {
         // << cur_page_header_.compressed_size_ << std::endl;
         if (RET_FAIL(read_from_file_and_rewrap(
                 value_in_stream_, value_chunk_meta_, value_chunk_visit_offset_,
-                cur_value_page_header_.compressed_size_,
-                file_data_value_buf_size_))) {
+                file_data_value_buf_size_,
+                cur_value_page_header_.compressed_size_))) {
         }
     }
 
@@ -529,19 +529,26 @@ int 
AlignedChunkReader::decode_time_value_buf_into_tsblock(
         int64_t time = 0;                                                      
\
         CppType value;                                                         
\
         while ((time_decoder_->has_remaining() || time_in.has_remaining())     
\
-                && (value_decoder_->has_remaining() ||                        \
-                value_in.has_remaining())){                                    
 \
+                && (value_decoder_->has_remaining() ||                         
\
+                value_in.has_remaining())){                                    
\
             cur_value_index++;                                                 
\
             if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &        
\
                   0xFF) &                                                      
\
                  (mask >> (cur_value_index % 8))) == 0) {                      
\
-                RET_FAIL(time_decoder_->read_int64(time, time_in));            
\
+                ret = time_decoder_->read_int64(time, time_in);                
\
                 if (ret != E_OK) {                                             
\
                     break;                                                     
\
                 }                                                              
\
+                ret = value_decoder_->read_##ReadType(value,                   
\
+                value_in);                                                     
\
+                if (ret != E_OK) {                                             
\
+                    break;                                                     
\
+                }                                                              
\
+                continue;                                                      
\
             }                                                                  
\
             if (UNLIKELY(!row_appender.add_row())) {                           
\
                 ret = E_OVERFLOW;                                              
\
+                cur_value_index--;                                            \
                 break;                                                         
\
             } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {   
\
             } else if (RET_FAIL(value_decoder_->read_##ReadType(value,         
\
@@ -569,7 +576,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
         while ((time_decoder_->has_remaining() &&
                 value_decoder_->has_remaining()) ||
                (time_in.has_remaining() && value_in.has_remaining())) {
-            cur_value_index++;
             if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
                  (mask >> (cur_value_index % 8))) == 0) {
                 RET_FAIL(time_decoder_->read_int64(time, time_in));
diff --git a/cpp/src/reader/aligned_chunk_reader.h 
b/cpp/src/reader/aligned_chunk_reader.h
index 365e9efe..becca806 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -94,7 +94,7 @@ class AlignedChunkReader : public IChunkReader {
     int read_from_file_and_rewrap(common::ByteStream &in_stream_,
                                   ChunkMeta *&chunk_meta,
                                   uint32_t &chunk_visit_offset,
-                                  int32_t file_data_buf_size,
+                                  int32_t &file_data_buf_size,
                                   int want_size = 0);
     bool cur_page_statisify_filter(Filter *filter);
     int skip_cur_page();
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc 
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index e8ee6693..2c62d87f 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -39,7 +39,6 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* 
device_query_task,
     int ret = common::E_OK;
     pa_.init(512, common::AllocModID::MOD_TSFILE_READER);
     tuple_desc_.reset();
-    common::init_common();
     auto table_schema = device_query_task->get_table_schema();
     tuple_desc_.push_back(common::g_time_column_schema);
     for (const auto& column_name : device_query_task_->get_column_names()) {
diff --git a/cpp/src/reader/qds_without_timegenerator.cc 
b/cpp/src/reader/qds_without_timegenerator.cc
index 0a4a6f38..805c9c4d 100644
--- a/cpp/src/reader/qds_without_timegenerator.cc
+++ b/cpp/src/reader/qds_without_timegenerator.cc
@@ -68,7 +68,8 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader,
 
     for (size_t i = 0; i < path_count; i++) {
         get_next_tsblock(i, true);
-        data_types.push_back(value_iters_[i]->get_data_type());
+        data_types.push_back(value_iters_[i] != nullptr ?
+            value_iters_[i]->get_data_type() : TSDataType::NULL_TYPE);
     }
     result_set_metadata_ =
         std::make_shared<ResultSetMetadata>(column_names, data_types);
diff --git a/cpp/src/reader/table_query_executor.h 
b/cpp/src/reader/table_query_executor.h
index 32d522c0..e9a3c513 100644
--- a/cpp/src/reader/table_query_executor.h
+++ b/cpp/src/reader/table_query_executor.h
@@ -50,7 +50,7 @@ class TableQueryExecutor {
         tsfile_io_reader_->init(read_file);
         meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_);
         table_query_ordering_ = TableQueryOrdering::DEVICE;
-        block_size_ = 1024;
+        block_size_ = 10240;
     }
     ~TableQueryExecutor() {
         if (meta_data_querier_ != nullptr) {
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 8f88ec03..88e2927d 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -38,7 +38,7 @@ int libtsfile_init() {
     }
     ModStat::get_instance().init();
 
-    init_config_value();
+    init_common();
 
     g_s_is_inited = true;
     return E_OK;
diff --git a/cpp/test/encoding/ts2diff_codec_test.cc 
b/cpp/test/encoding/ts2diff_codec_test.cc
index cd6197cf..15836308 100644
--- a/cpp/test/encoding/ts2diff_codec_test.cc
+++ b/cpp/test/encoding/ts2diff_codec_test.cc
@@ -58,7 +58,7 @@ class TS2DIFFCodecTest : public ::testing::Test {
     LongTS2DIFFDecoder* decoder_long_;
 };
 
-TEST_F(TS2DIFFCodecTest, TestIntEncoding) {
+TEST_F(TS2DIFFCodecTest, TestIntEncoding1) {
     common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
     const int row_num = 10000;
     int32_t data[row_num];
@@ -79,7 +79,49 @@ TEST_F(TS2DIFFCodecTest, TestIntEncoding) {
     }
 }
 
+TEST_F(TS2DIFFCodecTest, TestIntEncoding2) {
+    common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+    const int row_num = 10000;
+    int32_t data[row_num];
+    memset(data, 0, sizeof(int32_t) * row_num);
+    for (int i = 0; i < row_num; i++) {
+        data[i] = i;
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(encoder_int_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(encoder_int_->flush(out_stream), common::E_OK);
+
+    int32_t x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(decoder_int_->read_int32(x, out_stream), common::E_OK);
+        EXPECT_EQ(x, data[i]);
+    }
+}
+
 TEST_F(TS2DIFFCodecTest, TestLongEncoding) {
+    common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
+    const int row_num = 10000;
+    int64_t data[row_num];
+    memset(data, 0, sizeof(int64_t) * row_num);
+    for (int i = 0; i < row_num; i++) {
+        data[i] = i;
+    }
+
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(encoder_long_->encode(data[i], out_stream), common::E_OK);
+    }
+    EXPECT_EQ(encoder_long_->flush(out_stream), common::E_OK);
+
+    int64_t x;
+    for (int i = 0; i < row_num; i++) {
+        EXPECT_EQ(decoder_long_->read_int64(x, out_stream), common::E_OK);
+        EXPECT_EQ(x, data[i]);
+    }
+}
+
+TEST_F(TS2DIFFCodecTest, TestLongEncoding2) {
     common::ByteStream out_stream(1024, common::MOD_TS2DIFF_OBJ, false);
     const int row_num = 10000;
     int64_t data[row_num];
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc 
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index 1fb787e8..0d8f6832 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -102,7 +102,8 @@ class TsFileTableReaderTest : public ::testing::Test {
         storage::Tablet tablet(table_schema->get_table_name(),
                                table_schema->get_measurement_names(),
                                table_schema->get_data_types(),
-                               table_schema->get_column_categories());
+                               table_schema->get_column_categories(),
+                               device_num * num_timestamp_per_device);
 
         char* literal = new char[std::strlen("device_id") + 1];
         std::strcpy(literal, "device_id");
@@ -199,8 +200,18 @@ class TsFileTableReaderTest : public ::testing::Test {
 
 TEST_F(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryOnePage) {
+TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
+    int prev_config = g_config_value_.page_writer_max_point_num_;
+    g_config_value_.page_writer_max_point_num_ = 5;
     test_table_model_query(g_config_value_.page_writer_max_point_num_);
+    g_config_value_.page_writer_max_point_num_ = prev_config;
+}
+
+TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) {
+    int prev_config = g_config_value_.page_writer_max_point_num_;
+    g_config_value_.page_writer_max_point_num_ = 10000;
+    test_table_model_query(g_config_value_.page_writer_max_point_num_);
+    g_config_value_.page_writer_max_point_num_ = prev_config;
 }
 
 TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {

Reply via email to