This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch read_parallel
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit d4be0ab9e23b28c9352c0579230caedc820545a1
Author: ColinLee <[email protected]>
AuthorDate: Sat Apr 11 09:19:53 2026 +0800

    support read parallel.
---
 cpp/src/common/config/config.h                     |    2 +
 cpp/src/common/global.cc                           |    2 +
 cpp/src/common/global.h                            |   14 +
 cpp/src/common/tsblock/tsblock.h                   |    6 +
 cpp/src/common/tsfile_common.h                     |   42 +
 cpp/src/encoding/decoder.h                         |   97 ++
 cpp/src/file/tsfile_io_reader.cc                   |  116 +++
 cpp/src/file/tsfile_io_reader.h                    |    5 +
 cpp/src/reader/aligned_chunk_reader.cc             | 1066 +++++++++++++++++++-
 cpp/src/reader/aligned_chunk_reader.h              |  166 ++-
 cpp/src/reader/filter/filter.h                     |   13 +
 cpp/src/reader/tsfile_series_scan_iterator.cc      |  203 +++-
 cpp/src/reader/tsfile_series_scan_iterator.h       |   51 +-
 .../table_view/tsfile_reader_table_batch_test.cc   |   17 +-
 .../reader/table_view/tsfile_reader_table_test.cc  |   47 +-
 .../table_view/tsfile_table_query_by_row_test.cc   |   46 +-
 .../reader/tree_view/tsfile_reader_tree_test.cc    |   21 +-
 .../tree_view/tsfile_tree_query_by_row_test.cc     |   66 +-
 cpp/test/reader/tsfile_reader_test.cc              |   15 +-
 19 files changed, 1821 insertions(+), 174 deletions(-)

diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h
index e2b2039a..8917593a 100644
--- a/cpp/src/common/config/config.h
+++ b/cpp/src/common/config/config.h
@@ -46,7 +46,9 @@ typedef struct ConfigValue {
     TSEncoding double_encoding_type_;
     TSEncoding string_encoding_type_;
     CompressionType default_compression_type_;
+    bool parallel_read_enabled_;
     bool parallel_write_enabled_;
+    int32_t read_thread_count_;
     int32_t write_thread_count_;
     // When true, aligned writer enforces page size limit strictly by
     // interleaving time/value writes and sealing pages together when any side
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index ea4bf128..c47f3e8b 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -69,6 +69,8 @@ void init_config_value() {
 #else
     g_config_value_.default_compression_type_ = UNCOMPRESSED;
 #endif
+    g_config_value_.parallel_read_enabled_ = true;
+    g_config_value_.read_thread_count_ = 4;
     unsigned int hw_cores = std::thread::hardware_concurrency();
     if (hw_cores == 0) hw_cores = 1;  // fallback if detection fails
     g_config_value_.parallel_write_enabled_ = (hw_cores > 1);
diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h
index ba5f4bd4..fedb97da 100644
--- a/cpp/src/common/global.h
+++ b/cpp/src/common/global.h
@@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() {
     return static_cast<uint8_t>(g_config_value_.default_compression_type_);
 }
 
+FORCE_INLINE void set_parallel_read_enabled(bool enabled) {
+    g_config_value_.parallel_read_enabled_ = enabled;
+}
+
+FORCE_INLINE bool get_parallel_read_enabled() {
+    return g_config_value_.parallel_read_enabled_;
+}
+
+FORCE_INLINE int set_read_thread_count(int32_t count) {
+    if (count < 1 || count > 64) return E_INVALID_ARG;
+    g_config_value_.read_thread_count_ = count;
+    return E_OK;
+}
+
 FORCE_INLINE void set_parallel_write_enabled(bool enabled) {
     g_config_value_.parallel_write_enabled_ = enabled;
 }
diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index 859ad393..c7744f07 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -144,6 +144,12 @@ class RowAppender {
         ASSERT(tsblock_->row_count_ > 0);
         tsblock_->row_count_--;
     }
+    FORCE_INLINE uint32_t remaining() const {
+        return tsblock_->max_row_count_ - tsblock_->row_count_;
+    }
+    FORCE_INLINE void add_rows(uint32_t count) {
+        tsblock_->row_count_ += count;
+    }
 
     FORCE_INLINE void append(uint32_t slot_index, const char* value,
                              uint32_t len) {
diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h
index c866e499..1a7106f8 100644
--- a/cpp/src/common/tsfile_common.h
+++ b/cpp/src/common/tsfile_common.h
@@ -314,6 +314,11 @@ class ITimeseriesIndex {
     virtual common::SimpleList<ChunkMeta*>* get_value_chunk_meta_list() const {
         return nullptr;
     }
+    virtual uint32_t get_value_column_count() const { return 1; }
+    virtual common::SimpleList<ChunkMeta*>* get_value_chunk_meta_list(
+        uint32_t col_index) const {
+        return col_index == 0 ? get_value_chunk_meta_list() : nullptr;
+    }
 
     virtual common::String get_measurement_name() const {
         return common::String();
@@ -605,6 +610,43 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex {
 #endif
 };
 
+class MultiAlignedTimeseriesIndex : public ITimeseriesIndex {
+   public:
+    TimeseriesIndex* time_ts_idx_ = nullptr;
+    std::vector<TimeseriesIndex*> value_ts_idxs_;
+
+    MultiAlignedTimeseriesIndex() {}
+    ~MultiAlignedTimeseriesIndex() {}
+
+    common::SimpleList<ChunkMeta*>* get_time_chunk_meta_list() const override {
+        return time_ts_idx_ ? time_ts_idx_->get_chunk_meta_list() : nullptr;
+    }
+    common::SimpleList<ChunkMeta*>* get_value_chunk_meta_list() const override 
{
+        return value_ts_idxs_.empty()
+                   ? nullptr
+                   : value_ts_idxs_[0]->get_chunk_meta_list();
+    }
+    uint32_t get_value_column_count() const override {
+        return value_ts_idxs_.size();
+    }
+    common::SimpleList<ChunkMeta*>* get_value_chunk_meta_list(
+        uint32_t col_index) const override {
+        return col_index < value_ts_idxs_.size()
+                   ? value_ts_idxs_[col_index]->get_chunk_meta_list()
+                   : nullptr;
+    }
+    common::String get_measurement_name() const override {
+        return value_ts_idxs_.empty()
+                   ? common::String()
+                   : value_ts_idxs_[0]->get_measurement_name();
+    }
+    common::TSDataType get_data_type() const override {
+        return time_ts_idx_ ? time_ts_idx_->get_data_type()
+                            : common::INVALID_DATATYPE;
+    }
+    Statistic* get_statistic() const override { return nullptr; }
+};
+
 class TSMIterator {
    public:
     explicit TSMIterator(
diff --git a/cpp/src/encoding/decoder.h b/cpp/src/encoding/decoder.h
index c290b579..f85ccf1b 100644
--- a/cpp/src/encoding/decoder.h
+++ b/cpp/src/encoding/decoder.h
@@ -21,6 +21,7 @@
 #define ENCODING_DECODER_H
 
 #include "common/allocator/byte_stream.h"
+#include "common/db_common.h"
 
 namespace storage {
 
@@ -37,6 +38,102 @@ class Decoder {
     virtual int read_double(double& ret_value, common::ByteStream& in) = 0;
     virtual int read_String(common::String& ret_value, common::PageArena& pa,
                             common::ByteStream& in) = 0;
+
+    virtual int read_batch_int32(int32_t* out, int capacity, int& actual,
+                                 common::ByteStream& in) {
+        actual = 0;
+        int ret = common::E_OK;
+        int32_t val;
+        while (actual < capacity && has_remaining(in)) {
+            ret = read_int32(val, in);
+            if (ret != common::E_OK) return ret;
+            out[actual++] = val;
+        }
+        return common::E_OK;
+    }
+
+    virtual int read_batch_int64(int64_t* out, int capacity, int& actual,
+                                 common::ByteStream& in) {
+        actual = 0;
+        int ret = common::E_OK;
+        int64_t val;
+        while (actual < capacity && has_remaining(in)) {
+            ret = read_int64(val, in);
+            if (ret != common::E_OK) return ret;
+            out[actual++] = val;
+        }
+        return common::E_OK;
+    }
+
+    virtual int read_batch_float(float* out, int capacity, int& actual,
+                                 common::ByteStream& in) {
+        actual = 0;
+        int ret = common::E_OK;
+        float val;
+        while (actual < capacity && has_remaining(in)) {
+            ret = read_float(val, in);
+            if (ret != common::E_OK) return ret;
+            out[actual++] = val;
+        }
+        return common::E_OK;
+    }
+
+    virtual int read_batch_double(double* out, int capacity, int& actual,
+                                  common::ByteStream& in) {
+        actual = 0;
+        int ret = common::E_OK;
+        double val;
+        while (actual < capacity && has_remaining(in)) {
+            ret = read_double(val, in);
+            if (ret != common::E_OK) return ret;
+            out[actual++] = val;
+        }
+        return common::E_OK;
+    }
+
+    virtual int skip_int32(int count, int& skipped, common::ByteStream& in) {
+        skipped = 0;
+        int32_t dummy;
+        while (skipped < count && has_remaining(in)) {
+            int ret = read_int32(dummy, in);
+            if (ret != common::E_OK) return ret;
+            ++skipped;
+        }
+        return common::E_OK;
+    }
+
+    virtual int skip_int64(int count, int& skipped, common::ByteStream& in) {
+        skipped = 0;
+        int64_t dummy;
+        while (skipped < count && has_remaining(in)) {
+            int ret = read_int64(dummy, in);
+            if (ret != common::E_OK) return ret;
+            ++skipped;
+        }
+        return common::E_OK;
+    }
+
+    virtual int skip_float(int count, int& skipped, common::ByteStream& in) {
+        skipped = 0;
+        float dummy;
+        while (skipped < count && has_remaining(in)) {
+            int ret = read_float(dummy, in);
+            if (ret != common::E_OK) return ret;
+            ++skipped;
+        }
+        return common::E_OK;
+    }
+
+    virtual int skip_double(int count, int& skipped, common::ByteStream& in) {
+        skipped = 0;
+        double dummy;
+        while (skipped < count && has_remaining(in)) {
+            int ret = read_double(dummy, in);
+            if (ret != common::E_OK) return ret;
+            ++skipped;
+        }
+        return common::E_OK;
+    }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc
index e96008a4..7f32ee7e 100644
--- a/cpp/src/file/tsfile_io_reader.cc
+++ b/cpp/src/file/tsfile_io_reader.cc
@@ -80,6 +80,122 @@ int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> 
device_id,
     return ret;
 }
 
+int TsFileIOReader::alloc_multi_ssi(
+    std::shared_ptr<IDeviceID> device_id,
+    const std::vector<std::string>& measurement_names,
+    TsFileSeriesScanIterator*& ssi, common::PageArena& pa,
+    Filter* time_filter) {
+    int ret = E_OK;
+    if (RET_FAIL(load_tsfile_meta_if_necessary())) return ret;
+
+    ssi = new TsFileSeriesScanIterator;
+    ssi->init(device_id, measurement_names.empty() ? "" : measurement_names[0],
+              read_file_, time_filter, pa);
+
+    auto& ssi_pa = ssi->timeseries_index_pa_;
+
+    // Load device meta index
+    std::shared_ptr<IMetaIndexEntry> device_index_entry;
+    int64_t end_offset;
+    if (RET_FAIL(load_device_index_entry(
+            std::make_shared<DeviceIDComparable>(device_id),
+            device_index_entry, end_offset))) {
+        delete ssi;
+        ssi = nullptr;
+        return ret;
+    }
+
+    // Read measurement-level meta index node
+    int64_t start_offset = device_index_entry->get_offset();
+    ASSERT(start_offset < end_offset);
+    const int32_t read_size = end_offset - start_offset;
+    int32_t ret_read_len = 0;
+    char* data_buf = (char*)ssi_pa.alloc(read_size);
+    void* m_idx_node_buf = ssi_pa.alloc(sizeof(MetaIndexNode));
+    if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
+        delete ssi;
+        ssi = nullptr;
+        return E_OOM;
+    }
+    auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&ssi_pa);
+    auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
+                                                    
MetaIndexNode::self_deleter);
+    if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
+                                  ret_read_len))) {
+        delete ssi;
+        ssi = nullptr;
+        return ret;
+    }
+    if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
+        delete ssi;
+        ssi = nullptr;
+        return ret;
+    }
+
+    if (!is_aligned_device(top_node)) {
+        delete ssi;
+        ssi = nullptr;
+        return E_NOT_SUPPORT;
+    }
+
+    // Get time column metadata
+    TimeseriesIndex* time_ts_idx = nullptr;
+    if (RET_FAIL(get_time_column_metadata(top_node, time_ts_idx, ssi_pa))) {
+        delete ssi;
+        ssi = nullptr;
+        return ret;
+    }
+
+    // Create MultiAlignedTimeseriesIndex
+    void* multi_buf = ssi_pa.alloc(sizeof(MultiAlignedTimeseriesIndex));
+    if (IS_NULL(multi_buf)) {
+        delete ssi;
+        ssi = nullptr;
+        return E_OOM;
+    }
+    auto* multi_idx = new (multi_buf) MultiAlignedTimeseriesIndex;
+    multi_idx->time_ts_idx_ = time_ts_idx;
+
+    // Load each measurement's TimeseriesIndex
+    for (const auto& meas_name : measurement_names) {
+        std::shared_ptr<IMetaIndexEntry> meas_entry;
+        int64_t meas_end_offset = 0;
+        if (RET_FAIL(load_measurement_index_entry(
+                meas_name, top_node, meas_entry, meas_end_offset))) {
+            delete ssi;
+            ssi = nullptr;
+            return ret;
+        }
+
+        ITimeseriesIndex* ts_idx = nullptr;
+        if (RET_FAIL(do_load_timeseries_index(
+                meas_name, meas_entry->get_offset(), meas_end_offset, ssi_pa,
+                ts_idx, /*is_aligned=*/true))) {
+            delete ssi;
+            ssi = nullptr;
+            return ret;
+        }
+
+        auto* aligned_idx = dynamic_cast<AlignedTimeseriesIndex*>(ts_idx);
+        if (aligned_idx && aligned_idx->value_ts_idx_) {
+            multi_idx->value_ts_idxs_.push_back(aligned_idx->value_ts_idx_);
+        } else {
+            delete ssi;
+            ssi = nullptr;
+            return E_NOT_EXIST;
+        }
+    }
+
+    ssi->itimeseries_index_ = multi_idx;
+
+    if (RET_FAIL(ssi->init_chunk_reader())) {
+        ssi->destroy();
+        delete ssi;
+        ssi = nullptr;
+    }
+    return ret;
+}
+
 void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator* ssi) {
     if (ssi != nullptr) {
         ssi->destroy();
diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h
index 2f4135e0..e6e0a04a 100644
--- a/cpp/src/file/tsfile_io_reader.h
+++ b/cpp/src/file/tsfile_io_reader.h
@@ -59,6 +59,11 @@ class TsFileIOReader {
                   TsFileSeriesScanIterator*& ssi, common::PageArena& pa,
                   Filter* time_filter = nullptr);
 
+    int alloc_multi_ssi(std::shared_ptr<IDeviceID> device_id,
+                        const std::vector<std::string>& measurement_names,
+                        TsFileSeriesScanIterator*& ssi, common::PageArena& pa,
+                        Filter* time_filter = nullptr);
+
     void revert_ssi(TsFileSeriesScanIterator* ssi);
 
     std::string get_file_path() const { return read_file_->file_path(); }
diff --git a/cpp/src/reader/aligned_chunk_reader.cc 
b/cpp/src/reader/aligned_chunk_reader.cc
index 955715d4..6b9b02e2 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -19,8 +19,13 @@
 
 #include "aligned_chunk_reader.h"
 
+#include <algorithm>
 #include <limits>
 
+#include "common/global.h"
+#ifdef ENABLE_THREADS
+#include "common/thread_pool.h"
+#endif
 #include "compress/compressor_factory.h"
 #include "encoding/decoder_factory.h"
 
@@ -56,19 +61,49 @@ void AlignedChunkReader::reset() {
     if (file_data_buf != nullptr) {
         mem_free(file_data_buf);
     }
+    time_in_stream_.clear_wrapped_buf();
     time_in_stream_.reset();
     file_data_buf = value_in_stream_.get_wrapped_buf();
     if (file_data_buf != nullptr) {
         mem_free(file_data_buf);
     }
+    value_in_stream_.clear_wrapped_buf();
     value_in_stream_.reset();
     file_data_time_buf_size_ = 0;
     file_data_value_buf_size_ = 0;
     time_chunk_visit_offset_ = 0;
     value_chunk_visit_offset_ = 0;
+
+    // Free leftover uncompressed buffers from the previous chunk.
+    if (time_uncompressed_buf_ != nullptr && time_compressor_ != nullptr) {
+        time_compressor_->after_uncompress(time_uncompressed_buf_);
+        time_uncompressed_buf_ = nullptr;
+    }
+
+    // Multi-value reset
+    for (auto* col : value_columns_) {
+        if (col->uncompressed_buf != nullptr && col->compressor != nullptr) {
+            col->compressor->after_uncompress(col->uncompressed_buf);
+            col->uncompressed_buf = nullptr;
+        }
+        char* buf = col->in_stream.get_wrapped_buf();
+        if (buf != nullptr) mem_free(buf);
+        col->in_stream.clear_wrapped_buf();
+        col->in_stream.reset();
+        col->in.reset();
+        col->chunk_header.reset();
+        col->cur_page_header.reset();
+        col->file_data_buf_size = 0;
+        col->chunk_visit_offset = 0;
+        col->notnull_bitmap.clear();
+        col->cur_value_index = -1;
+        col->chunk_meta = nullptr;
+    }
+    cleanup_chunk_decode();
 }
 
 void AlignedChunkReader::destroy() {
+    cleanup_chunk_decode();
     if (time_uncompressed_buf_ != nullptr && time_compressor_ != nullptr) {
         time_compressor_->after_uncompress(time_uncompressed_buf_);
         time_uncompressed_buf_ = nullptr;
@@ -112,6 +147,32 @@ void AlignedChunkReader::destroy() {
     }
     cur_value_page_header_.reset();
     chunk_header_.~ChunkHeader();
+
+    // Multi-value destroy
+    for (size_t ci = 0; ci < value_columns_.size(); ci++) {
+        auto* col = value_columns_[ci];
+        if (col->decoder != nullptr) {
+            col->decoder->~Decoder();
+            DecoderFactory::free(col->decoder);
+            col->decoder = nullptr;
+        }
+        if (col->compressor != nullptr) {
+            col->compressor->~Compressor();
+            CompressorFactory::free(col->compressor);
+            col->compressor = nullptr;
+        }
+        buf = col->in_stream.get_wrapped_buf();
+        if (buf != nullptr) {
+            mem_free(buf);
+            col->in_stream.clear_wrapped_buf();
+        }
+        col->cur_page_header.reset();
+        delete col;
+    }
+    value_columns_.clear();
+#ifdef ENABLE_THREADS
+    decode_pool_ = nullptr;  // borrowed, not owned
+#endif
 }
 
 int AlignedChunkReader::load_by_aligned_meta(ChunkMeta* time_chunk_meta,
@@ -218,15 +279,20 @@ int AlignedChunkReader::alloc_compressor_and_decoder(
 
 int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
                                       Filter* oneshoot_filter, PageArena& pa) {
+    if (multi_value_mode_) {
+        return get_next_page_multi(ret_tsblock, oneshoot_filter, pa);
+    }
     int ret = E_OK;
     Filter* filter =
         (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
-    if (prev_time_page_not_finish() && prev_value_page_not_finish()) {
+    bool pt = prev_time_page_not_finish();
+    bool pv = prev_value_page_not_finish();
+    if (pt && pv) {
         ret = decode_time_value_buf_into_tsblock(ret_tsblock, oneshoot_filter,
                                                  &pa);
         return ret;
     }
-    if (!prev_time_page_not_finish() && !prev_value_page_not_finish()) {
+    if (!pt && !pv) {
         while (IS_SUCC(ret)) {
             if (RET_FAIL(get_cur_page_header(
                     time_chunk_meta_, time_in_stream_, cur_time_page_header_,
@@ -259,7 +325,8 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta*& 
chunk_meta,
                                             common::ByteStream& in_stream,
                                             PageHeader& cur_page_header,
                                             uint32_t& chunk_visit_offset,
-                                            ChunkHeader& chunk_header) {
+                                            ChunkHeader& chunk_header,
+                                            int32_t* override_buf_size) {
     int ret = E_OK;
     bool retry = true;
     int cur_page_header_serialized_size = 0;
@@ -282,7 +349,8 @@ int AlignedChunkReader::get_cur_page_header(ChunkMeta*& 
chunk_meta,
             retry = false;
             retry_read_want_size += 1024;
             int32_t& file_data_buf_size =
-                chunk_header.data_type_ == common::VECTOR
+                override_buf_size != nullptr ? *override_buf_size
+                : chunk_header.data_type_ == common::VECTOR
                     ? file_data_time_buf_size_
                     : file_data_value_buf_size_;
             // do not shrink buffer for page header, otherwise, the buffer is
@@ -319,16 +387,18 @@ int AlignedChunkReader::read_from_file_and_rewrap(
     int ret = E_OK;
     const int DEFAULT_READ_SIZE = 4096;  // may use page_size + 
page_header_size
     char* file_data_buf = in_stream_.get_wrapped_buf();
-    int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
+    int64_t offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
     int read_size =
         (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
     if (file_data_buf_size < read_size ||
         (may_shrink && read_size < file_data_buf_size / 10)) {
         file_data_buf = (char*)mem_realloc(file_data_buf, read_size);
         if (IS_NULL(file_data_buf)) {
+            in_stream_.clear_wrapped_buf();
             return E_OOM;
         }
         file_data_buf_size = read_size;
+        in_stream_.wrap_from(file_data_buf, read_size);
     }
     int ret_read_len = 0;
     if (RET_FAIL(
@@ -760,6 +830,9 @@ int AlignedChunkReader::get_next_page(TsBlock* ret_tsblock,
                                       Filter* oneshoot_filter, PageArena& pa,
                                       int64_t min_time_hint, int& row_offset,
                                       int& row_limit) {
+    if (multi_value_mode_) {
+        return get_next_page_multi(ret_tsblock, oneshoot_filter, pa);
+    }
     int ret = E_OK;
     Filter* filter =
         (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
@@ -810,4 +883,987 @@ int AlignedChunkReader::get_next_page(TsBlock* 
ret_tsblock,
     return ret;
 }
 
+// ══════════════════════════════════════════════════════════════════════════
+//  Multi-value AlignedChunkReader implementation
+// ══════════════════════════════════════════════════════════════════════════
+
+int AlignedChunkReader::load_by_aligned_meta_multi(
+    ChunkMeta* time_chunk_meta, const std::vector<ChunkMeta*>& value_metas) {
+    int ret = E_OK;
+    multi_value_mode_ = true;
+    time_chunk_meta_ = time_chunk_meta;
+
+    // ── Load time chunk header ──
+    file_data_time_buf_size_ = 1024;
+    int32_t ret_read_len = 0;
+    char* time_file_data_buf =
+        (char*)mem_alloc(file_data_time_buf_size_, MOD_CHUNK_READER);
+    if (IS_NULL(time_file_data_buf)) return E_OOM;
+
+    ret = read_file_->read(time_chunk_meta_->offset_of_chunk_header_,
+                           time_file_data_buf, file_data_time_buf_size_,
+                           ret_read_len);
+    if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
+        ret = E_TSFILE_CORRUPTED;
+        mem_free(time_file_data_buf);
+        return ret;
+    }
+    if (IS_SUCC(ret)) {
+        time_in_stream_.wrap_from(time_file_data_buf, ret_read_len);
+        if (RET_FAIL(time_chunk_header_.deserialize_from(time_in_stream_))) {
+            return ret;
+        }
+        time_chunk_visit_offset_ = time_in_stream_.read_pos();
+    }
+
+    // Alloc time decoder/compressor
+    if (IS_SUCC(ret)) {
+        if (RET_FAIL(alloc_compressor_and_decoder(
+                time_decoder_, time_compressor_,
+                time_chunk_header_.encoding_type_,
+                time_chunk_header_.data_type_,
+                time_chunk_header_.compression_type_))) {
+            return ret;
+        }
+    }
+
+    // ── Load each value column ──
+    if (value_columns_.size() != value_metas.size()) {
+        for (auto* p : value_columns_) delete p;
+        value_columns_.clear();
+        value_columns_.reserve(value_metas.size());
+        for (size_t c = 0; c < value_metas.size(); c++) {
+            value_columns_.push_back(new ValueColumnState);
+        }
+    }
+    for (size_t c = 0; c < value_metas.size() && IS_SUCC(ret); c++) {
+        auto* col = value_columns_[c];
+        col->chunk_meta = value_metas[c];
+        col->file_data_buf_size = 1024;
+        ret_read_len = 0;
+        char* vbuf =
+            (char*)mem_alloc(col->file_data_buf_size, MOD_CHUNK_READER);
+        if (IS_NULL(vbuf)) return E_OOM;
+
+        ret = read_file_->read(col->chunk_meta->offset_of_chunk_header_, vbuf,
+                               col->file_data_buf_size, ret_read_len);
+        if (IS_SUCC(ret) && ret_read_len < ChunkHeader::MIN_SERIALIZED_SIZE) {
+            ret = E_TSFILE_CORRUPTED;
+            mem_free(vbuf);
+            break;
+        }
+        if (IS_SUCC(ret)) {
+            col->in_stream.wrap_from(vbuf, ret_read_len);
+            if (RET_FAIL(col->chunk_header.deserialize_from(col->in_stream))) {
+                break;
+            }
+            col->chunk_visit_offset = col->in_stream.read_pos();
+            if (RET_FAIL(alloc_compressor_and_decoder(
+                    col->decoder, col->compressor,
+                    col->chunk_header.encoding_type_,
+                    col->chunk_header.data_type_,
+                    col->chunk_header.compression_type_))) {
+                break;
+            }
+        }
+    }
+
+    return ret;
+}
+
+bool AlignedChunkReader::has_more_data_multi() const {
+    if (chunk_level_active_) return true;
+    if (prev_time_page_not_finish() || prev_any_value_page_not_finish_multi()) 
{
+        return true;
+    }
+    if (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ <
+        time_chunk_header_.data_size_) {
+        return true;
+    }
+    for (const auto* col : value_columns_) {
+        if (col->chunk_visit_offset - col->chunk_header.serialized_size_ <
+            col->chunk_header.data_size_) {
+            return true;
+        }
+    }
+    return false;
+}
+
+bool AlignedChunkReader::prev_any_value_page_not_finish_multi() const {
+    for (const auto* col : value_columns_) {
+        if ((col->decoder && col->decoder->has_remaining(col->in)) ||
+            col->in.has_remaining()) {
+            return true;
+        }
+    }
+    return false;
+}
+
+int AlignedChunkReader::get_next_page_multi(TsBlock* ret_tsblock,
+                                            Filter* oneshoot_filter,
+                                            PageArena& pa) {
+    int ret = E_OK;
+    Filter* filter =
+        (oneshoot_filter != nullptr ? oneshoot_filter : time_filter_);
+
+    // Resume chunk-level scatter from previous E_OVERFLOW.
+    if (chunk_level_active_) {
+        RowAppender row_appender(ret_tsblock);
+        ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa);
+        if (ret != E_OVERFLOW) {
+            cleanup_chunk_decode();
+        } else {
+            ret = E_OK;
+        }
+        return ret;
+    }
+
+#ifdef ENABLE_THREADS
+    // Chunk-level parallel path for multi-page compressed chunks.
+    if (decode_pool_ != nullptr && value_columns_.size() > 1 &&
+        !chunk_has_only_one_page(time_chunk_header_) &&
+        time_chunk_header_.compression_type_ != common::UNCOMPRESSED) {
+        ret = scan_chunk_pages(filter);
+        if (IS_FAIL(ret)) return ret;
+        if (chunk_pages_.empty()) return E_NO_MORE_DATA;
+
+        ret = decode_chunk_pages();
+        if (IS_FAIL(ret)) {
+            cleanup_chunk_decode();
+            return ret;
+        }
+
+        chunk_level_active_ = true;
+        RowAppender row_appender(ret_tsblock);
+        ret = scatter_chunk_pages(ret_tsblock, row_appender, filter, &pa);
+        if (ret != E_OVERFLOW) {
+            cleanup_chunk_decode();
+        } else {
+            ret = E_OK;
+        }
+        return ret;
+    }
+#endif
+
+    // Serial fallback.
+    return get_next_page_multi_serial(ret_tsblock, filter, pa);
+}
+
+int AlignedChunkReader::get_next_page_multi_serial(TsBlock* ret_tsblock,
+                                                   Filter* filter,
+                                                   PageArena& pa) {
+    int ret = E_OK;
+    bool pt = prev_time_page_not_finish();
+    bool pv = prev_any_value_page_not_finish_multi();
+    if (pt && pv) {
+        ret =
+            decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa);
+        return ret;
+    }
+    if (!pt && !pv) {
+        while (IS_SUCC(ret)) {
+            if (RET_FAIL(get_cur_page_header(
+                    time_chunk_meta_, time_in_stream_, cur_time_page_header_,
+                    time_chunk_visit_offset_, time_chunk_header_))) {
+                break;
+            }
+            for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) 
{
+                auto* col = value_columns_[c];
+                if (RET_FAIL(get_cur_page_header(
+                        col->chunk_meta, col->in_stream, col->cur_page_header,
+                        col->chunk_visit_offset, col->chunk_header,
+                        &col->file_data_buf_size))) {
+                }
+            }
+            if (IS_FAIL(ret)) break;
+            if (cur_page_statisify_filter_multi(filter)) break;
+            if (RET_FAIL(skip_cur_page_multi())) break;
+            if (!has_more_data()) {
+                ret = E_NO_MORE_DATA;
+                break;
+            }
+        }
+        if (IS_SUCC(ret)) {
+            ret = decode_cur_time_page_data();
+            if (IS_SUCC(ret)) ret = decode_cur_value_pages_multi();
+        }
+    }
+    if (IS_SUCC(ret)) {
+        ret =
+            decode_time_value_buf_into_tsblock_multi(ret_tsblock, filter, &pa);
+    }
+    return ret;
+}
+
+bool AlignedChunkReader::cur_page_statisify_filter_multi(Filter* filter) {
+    bool time_satisfy = filter == nullptr ||
+                        cur_time_page_header_.statistic_ == nullptr ||
+                        filter->satisfy(cur_time_page_header_.statistic_);
+    return time_satisfy;
+}
+
+int AlignedChunkReader::skip_cur_page_multi() {
+    time_chunk_visit_offset_ += cur_time_page_header_.compressed_size_;
+    time_in_stream_.wrapped_buf_advance_read_pos(
+        cur_time_page_header_.compressed_size_);
+    for (auto* col : value_columns_) {
+        col->chunk_visit_offset += col->cur_page_header.compressed_size_;
+        col->in_stream.wrapped_buf_advance_read_pos(
+            col->cur_page_header.compressed_size_);
+    }
+    return E_OK;
+}
+
+int AlignedChunkReader::decode_cur_value_pages_multi() {
+    int ret = E_OK;
+    // Phase 1: Serial IO — ensure each column's page data is in memory.
+    for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) {
+        ret = ensure_value_page_loaded(*value_columns_[c]);
+    }
+    if (IS_FAIL(ret)) return ret;
+
+    // Phase 2: Parallel CPU — decompress + parse bitmap + reset decoder.
+#ifdef ENABLE_THREADS
+    if (value_columns_.size() > 1 && decode_pool_ != nullptr) {
+        std::vector<int> col_rets(value_columns_.size(), E_OK);
+        for (size_t c = 0; c < value_columns_.size(); c++) {
+            auto* col = value_columns_[c];
+            int* col_ret = &col_rets[c];
+            decode_pool_->submit([col, col_ret] {
+                *col_ret = decompress_and_parse_value_page(*col);
+            });
+        }
+        decode_pool_->wait_all();
+        for (size_t c = 0; c < col_rets.size(); c++) {
+            if (IS_FAIL(col_rets[c])) return col_rets[c];
+        }
+    } else
+#endif
+    {
+        for (size_t c = 0; c < value_columns_.size() && IS_SUCC(ret); c++) {
+            ret = decompress_and_parse_value_page(*value_columns_[c]);
+        }
+    }
+    return ret;
+}
+
+int AlignedChunkReader::ensure_value_page_loaded(ValueColumnState& col) {
+    int ret = E_OK;
+    if (col.in_stream.remaining_size() < col.cur_page_header.compressed_size_) 
{
+        if (RET_FAIL(read_from_file_and_rewrap(
+                col.in_stream, col.chunk_meta, col.chunk_visit_offset,
+                col.file_data_buf_size,
+                col.cur_page_header.compressed_size_))) {
+            return ret;
+        }
+    }
+    return ret;
+}
+
+int AlignedChunkReader::decompress_and_parse_value_page(ValueColumnState& col) 
{
+    int ret = E_OK;
+
+    if (col.cur_page_header.compressed_size_ == 0) {
+        col.in.wrap_from(nullptr, 0);
+        return E_OK;
+    }
+
+    // Decompress
+    char* compressed_buf =
+        col.in_stream.get_wrapped_buf() + col.in_stream.read_pos();
+    uint32_t compressed_size = col.cur_page_header.compressed_size_;
+    col.in_stream.wrapped_buf_advance_read_pos(compressed_size);
+    col.chunk_visit_offset += compressed_size;
+
+    char* uncompressed_buf = nullptr;
+    uint32_t uncompressed_size = 0;
+    if (RET_FAIL(col.compressor->reset(false))) {
+        return ret;
+    }
+    if (RET_FAIL(col.compressor->uncompress(compressed_buf, compressed_size,
+                                            uncompressed_buf,
+                                            uncompressed_size))) {
+        return ret;
+    }
+    col.uncompressed_buf = uncompressed_buf;
+
+    if (uncompressed_size != col.cur_page_header.uncompressed_size_) {
+        return E_TSFILE_CORRUPTED;
+    }
+
+    // Parse bitmap + value data
+    uint32_t offset = 0;
+    uint32_t data_num = SerializationUtil::read_ui32(uncompressed_buf);
+    offset += sizeof(uint32_t);
+    col.notnull_bitmap.resize((data_num + 7) / 8);
+    for (size_t i = 0; i < col.notnull_bitmap.size(); i++) {
+        col.notnull_bitmap[i] = *(uncompressed_buf + offset);
+        offset++;
+    }
+    col.cur_value_index = -1;
+
+    char* value_buf = uncompressed_buf + offset;
+    uint32_t value_buf_size = uncompressed_size - offset;
+    col.decoder->reset();
+    col.in.wrap_from(value_buf, value_buf_size);
+    return ret;
+}
+
+int AlignedChunkReader::decode_time_value_buf_into_tsblock_multi(
+    TsBlock*& ret_tsblock, Filter* filter, PageArena* pa) {
+    int ret = E_OK;
+    RowAppender row_appender(ret_tsblock);
+    ret = multi_DECODE_TV_BATCH(ret_tsblock, row_appender, filter, pa);
+
+    // Release uncompressed buffers if pages are done
+    if (ret != E_OVERFLOW) {
+        if (time_uncompressed_buf_ != nullptr) {
+            time_compressor_->after_uncompress(time_uncompressed_buf_);
+            time_uncompressed_buf_ = nullptr;
+        }
+        for (auto* col : value_columns_) {
+            if (col->uncompressed_buf != nullptr) {
+                col->compressor->after_uncompress(col->uncompressed_buf);
+                col->uncompressed_buf = nullptr;
+            }
+            if (!(col->decoder && col->decoder->has_remaining(col->in)) &&
+                !col->in.has_remaining()) {
+                col->in.reset();
+            }
+            col->notnull_bitmap.clear();
+            col->notnull_bitmap.shrink_to_fit();
+        }
+        if (!prev_time_page_not_finish()) {
+            time_in_.reset();
+        }
+    } else {
+        ret = E_OK;
+    }
+    return ret;
+}
+
+int AlignedChunkReader::multi_DECODE_TV_BATCH(TsBlock* ret_tsblock,
+                                              RowAppender& row_appender,
+                                              Filter* filter, PageArena* pa) {
+    int ret = E_OK;
+    const int BATCH = 129;
+    int64_t times[BATCH];
+    const uint32_t null_mask_base = 1 << 7;
+    const uint32_t num_cols = value_columns_.size();
+
+    while (time_decoder_->has_remaining(time_in_)) {
+        if (row_appender.remaining() < (uint32_t)BATCH) {
+            ret = E_OVERFLOW;
+            break;
+        }
+
+        // ── Phase 1: Decode a batch of timestamps ──
+        int time_count = 0;
+        if (RET_FAIL(time_decoder_->read_batch_int64(times, BATCH, time_count,
+                                                     time_in_))) {
+            break;
+        }
+        if (time_count == 0) break;
+
+        // ── Phase 2: Apply time filter ──
+        bool time_mask[BATCH];
+        bool block_all_pass = (filter == nullptr);
+        int pass_count = time_count;
+        if (!block_all_pass) {
+            pass_count =
+                filter->satisfy_batch_time(times, time_count, time_mask);
+        }
+
+        // ── Phase 3: Per-column null check + value decode ──
+        struct ColBatch {
+            bool is_null[BATCH];
+            int nonnull_count;
+            char val_buf[BATCH * 8];
+            int val_count;
+        };
+        std::vector<ColBatch> col_batches(num_cols);
+
+        for (uint32_t c = 0; c < num_cols; c++) {
+            auto* col = value_columns_[c];
+            auto& cb = col_batches[c];
+            cb.nonnull_count = 0;
+            cb.val_count = 0;
+            for (int i = 0; i < time_count; i++) {
+                int vi = col->cur_value_index + 1 + i;
+                if (col->notnull_bitmap.empty() ||
+                    ((col->notnull_bitmap[vi / 8] & 0xFF) &
+                     (null_mask_base >> (vi % 8))) == 0) {
+                    cb.is_null[i] = true;
+                } else {
+                    cb.is_null[i] = false;
+                    cb.nonnull_count++;
+                }
+            }
+
+            // Skip values if no rows pass time filter
+            if (pass_count == 0 && cb.nonnull_count > 0) {
+                switch (col->chunk_header.data_type_) {
+                    case common::BOOLEAN: {
+                        for (int s = 0; s < cb.nonnull_count; s++) {
+                            bool dummy;
+                            col->decoder->read_boolean(dummy, col->in);
+                        }
+                        break;
+                    }
+                    case common::INT32:
+                    case common::DATE: {
+                        int sk = 0;
+                        col->decoder->skip_int32(cb.nonnull_count, sk, 
col->in);
+                        break;
+                    }
+                    case common::INT64:
+                    case common::TIMESTAMP: {
+                        int sk = 0;
+                        col->decoder->skip_int64(cb.nonnull_count, sk, 
col->in);
+                        break;
+                    }
+                    case common::FLOAT: {
+                        int sk = 0;
+                        col->decoder->skip_float(cb.nonnull_count, sk, 
col->in);
+                        break;
+                    }
+                    case common::DOUBLE: {
+                        int sk = 0;
+                        col->decoder->skip_double(cb.nonnull_count, sk,
+                                                  col->in);
+                        break;
+                    }
+                    default:
+                        break;
+                }
+                cb.nonnull_count = 0;
+            }
+
+            // Decode non-null values
+            if (cb.nonnull_count > 0) {
+                switch (col->chunk_header.data_type_) {
+                    case common::BOOLEAN: {
+                        bool* out = reinterpret_cast<bool*>(cb.val_buf);
+                        cb.val_count = 0;
+                        for (int s = 0; s < cb.nonnull_count; s++) {
+                            bool v;
+                            if (col->decoder->read_boolean(v, col->in) !=
+                                common::E_OK)
+                                break;
+                            out[cb.val_count++] = v;
+                        }
+                        break;
+                    }
+                    case common::INT32:
+                    case common::DATE:
+                        col->decoder->read_batch_int32(
+                            reinterpret_cast<int32_t*>(cb.val_buf),
+                            cb.nonnull_count, cb.val_count, col->in);
+                        break;
+                    case common::INT64:
+                    case common::TIMESTAMP:
+                        col->decoder->read_batch_int64(
+                            reinterpret_cast<int64_t*>(cb.val_buf),
+                            cb.nonnull_count, cb.val_count, col->in);
+                        break;
+                    case common::FLOAT:
+                        col->decoder->read_batch_float(
+                            reinterpret_cast<float*>(cb.val_buf),
+                            cb.nonnull_count, cb.val_count, col->in);
+                        break;
+                    case common::DOUBLE:
+                        col->decoder->read_batch_double(
+                            reinterpret_cast<double*>(cb.val_buf),
+                            cb.nonnull_count, cb.val_count, col->in);
+                        break;
+                    default:
+                        break;
+                }
+            }
+        }
+
+        // ── Phase 4: Skip if no rows pass ──
+        if (pass_count == 0) {
+            for (uint32_t c = 0; c < num_cols; c++) {
+                value_columns_[c]->cur_value_index += time_count;
+            }
+            continue;
+        }
+
+        // ── Phase 5: Scatter into TsBlock ──
+
+        // Fast path: all rows pass filter AND all columns have no nulls
+        if (pass_count == time_count) {
+            bool all_nonnull = true;
+            for (uint32_t c = 0; c < num_cols; c++) {
+                if (col_batches[c].nonnull_count != time_count) {
+                    all_nonnull = false;
+                    break;
+                }
+            }
+            if (all_nonnull) {
+                common::Vector* time_vec = ret_tsblock->get_vector(0);
+                time_vec->get_value_data().append_fixed_value(
+                    (const char*)times,
+                    static_cast<uint32_t>(time_count) * sizeof(int64_t));
+                for (uint32_t c = 0; c < num_cols; c++) {
+                    auto& cb = col_batches[c];
+                    auto* col = value_columns_[c];
+                    uint32_t elem_size = common::get_data_type_size(
+                        col->chunk_header.data_type_);
+                    common::Vector* vec = ret_tsblock->get_vector(c + 1);
+                    vec->get_value_data().append_fixed_value(
+                        cb.val_buf,
+                        static_cast<uint32_t>(cb.val_count) * elem_size);
+                    col->cur_value_index += time_count;
+                }
+                row_appender.add_rows(static_cast<uint32_t>(time_count));
+                continue;
+            }
+        }
+
+        // Slow path: per-row scatter
+        std::vector<int> val_idx(num_cols, 0);
+
+        for (int i = 0; i < time_count; i++) {
+            bool passes = block_all_pass || time_mask[i];
+
+            if (!passes) {
+                for (uint32_t c = 0; c < num_cols; c++) {
+                    value_columns_[c]->cur_value_index++;
+                    if (!col_batches[c].is_null[i]) val_idx[c]++;
+                }
+                continue;
+            }
+
+            if (UNLIKELY(!row_appender.add_row())) {
+                ret = E_OVERFLOW;
+                break;
+            }
+
+            row_appender.append(0, (char*)&times[i], sizeof(int64_t));
+
+            for (uint32_t c = 0; c < num_cols; c++) {
+                value_columns_[c]->cur_value_index++;
+                auto& cb = col_batches[c];
+                auto* col = value_columns_[c];
+
+                if (cb.is_null[i]) {
+                    row_appender.append_null(c + 1);
+                } else {
+                    uint32_t elem_size = common::get_data_type_size(
+                        col->chunk_header.data_type_);
+                    row_appender.append(
+                        c + 1, cb.val_buf + val_idx[c] * elem_size, elem_size);
+                    val_idx[c]++;
+                }
+            }
+        }
+        if (ret != E_OK) break;
+    }
+    return ret;
+}
+
+// ═══════════════════════════════════════════════════════════════════════════
+// Chunk-level parallel decode
+// ═══════════════════════════════════════════════════════════════════════════
+
+void AlignedChunkReader::cleanup_chunk_decode() {
+    for (size_t c = 0; c < chunk_cols_.size(); c++) {
+        for (auto& cp : chunk_cols_[c]) {
+            if (cp.uncompressed_buf) {
+                common::mem_free(cp.uncompressed_buf);
+                cp.uncompressed_buf = nullptr;
+            }
+        }
+    }
+    chunk_pages_.clear();
+    chunk_times_.clear();
+    chunk_cols_.clear();
+    chunk_page_cursor_ = 0;
+    chunk_level_active_ = false;
+}
+
+int AlignedChunkReader::scan_chunk_pages(Filter* filter) {
+    int ret = E_OK;
+    const uint32_t num_cols = value_columns_.size();
+    chunk_pages_.clear();
+
+    while (IS_SUCC(ret)) {
+        if (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ >=
+            time_chunk_header_.data_size_)
+            break;
+
+        if (RET_FAIL(get_cur_page_header(
+                time_chunk_meta_, time_in_stream_, cur_time_page_header_,
+                time_chunk_visit_offset_, time_chunk_header_)))
+            break;
+        if (cur_time_page_header_.compressed_size_ == 0 &&
+            cur_time_page_header_.uncompressed_size_ == 0)
+            break;
+
+        for (size_t c = 0; c < num_cols && IS_SUCC(ret); c++) {
+            auto* col = value_columns_[c];
+            if (RET_FAIL(get_cur_page_header(
+                    col->chunk_meta, col->in_stream, col->cur_page_header,
+                    col->chunk_visit_offset, col->chunk_header,
+                    &col->file_data_buf_size))) {
+            }
+        }
+        if (IS_FAIL(ret)) break;
+
+        Statistic* stat = cur_time_page_header_.statistic_;
+        PagePassType pt;
+        if (filter == nullptr || stat == nullptr) {
+            pt = PagePassType::FULL_PASS;
+        } else if (!filter->satisfy(stat)) {
+            pt = PagePassType::SKIP;
+        } else if (filter->contain_start_end_time(stat->start_time_,
+                                                  stat->end_time_)) {
+            pt = PagePassType::FULL_PASS;
+        } else {
+            pt = PagePassType::BOUNDARY;
+        }
+
+        if (pt != PagePassType::SKIP) {
+            ChunkPageInfo info;
+            info.pass_type = pt;
+            info.time_file_offset = time_chunk_meta_->offset_of_chunk_header_ +
+                                    time_chunk_visit_offset_;
+            info.time_compressed_size = cur_time_page_header_.compressed_size_;
+            info.time_uncompressed_size =
+                cur_time_page_header_.uncompressed_size_;
+            info.value_file_offsets.resize(num_cols);
+            info.value_compressed_sizes.resize(num_cols);
+            info.value_uncompressed_sizes.resize(num_cols);
+            for (size_t c = 0; c < num_cols; c++) {
+                auto* col = value_columns_[c];
+                info.value_file_offsets[c] =
+                    col->chunk_meta->offset_of_chunk_header_ +
+                    col->chunk_visit_offset;
+                info.value_compressed_sizes[c] =
+                    col->cur_page_header.compressed_size_;
+                info.value_uncompressed_sizes[c] =
+                    col->cur_page_header.uncompressed_size_;
+            }
+            chunk_pages_.push_back(std::move(info));
+        }
+
+        time_chunk_visit_offset_ += cur_time_page_header_.compressed_size_;
+        time_in_stream_.wrapped_buf_advance_read_pos(
+            cur_time_page_header_.compressed_size_);
+        for (size_t c = 0; c < num_cols; c++) {
+            auto* col = value_columns_[c];
+            col->chunk_visit_offset += col->cur_page_header.compressed_size_;
+            col->in_stream.wrapped_buf_advance_read_pos(
+                col->cur_page_header.compressed_size_);
+        }
+    }
+
+    const size_t np = chunk_pages_.size();
+    chunk_times_.resize(np);
+    chunk_cols_.resize(num_cols);
+    for (uint32_t c = 0; c < num_cols; c++) chunk_cols_[c].resize(np);
+    chunk_page_cursor_ = 0;
+    return ret;
+}
+
+int AlignedChunkReader::decode_chunk_pages() {
+    int ret = E_OK;
+    const size_t np = chunk_pages_.size();
+    const uint32_t num_cols = value_columns_.size();
+    if (np == 0) return ret;
+
+    auto file_read_page = [&](int64_t offset, uint32_t size, char* stack,
+                              uint32_t stack_sz, char*& out,
+                              bool& heap) -> int {
+        heap = size > stack_sz;
+        out =
+            heap ? (char*)common::mem_alloc(size, common::MOD_DEFAULT) : stack;
+        if (!out) return common::E_OOM;
+        int rlen = 0;
+        return read_file_->read(offset, out, size, rlen);
+    };
+
+    // ── Time column (serial) ──
+    for (size_t p = 0; p < np; p++) {
+        auto& info = chunk_pages_[p];
+        auto& td = chunk_times_[p];
+        td.count = 0;
+        td.cursor = 0;
+        if (info.time_compressed_size == 0) continue;
+
+        char stk[4096];
+        char* cbuf;
+        bool heap;
+        if (RET_FAIL(file_read_page(info.time_file_offset,
+                                    info.time_compressed_size, stk, 
sizeof(stk),
+                                    cbuf, heap)))
+            return ret;
+
+        char* ub = nullptr;
+        uint32_t us = 0;
+        time_compressor_->reset(false);
+        int r = time_compressor_->uncompress(cbuf, info.time_compressed_size,
+                                             ub, us);
+        if (heap && cbuf != ub) common::mem_free(cbuf);
+        if (r != E_OK || us != info.time_uncompressed_size) {
+            if (ub) time_compressor_->after_uncompress(ub);
+            return E_TSFILE_CORRUPTED;
+        }
+
+        common::ByteStream ts_in;
+        ts_in.wrap_from(ub, us);
+        time_decoder_->reset();
+        td.times.clear();
+        const int BS = 1024;
+        int64_t buf[BS];
+        while (time_decoder_->has_remaining(ts_in)) {
+            int actual = 0;
+            time_decoder_->read_batch_int64(buf, BS, actual, ts_in);
+            if (actual == 0) break;
+            td.times.insert(td.times.end(), buf, buf + actual);
+        }
+        td.count = (int)td.times.size();
+        time_compressor_->after_uncompress(ub);
+    }
+
+    // ── Value column decode lambda ──
+    auto decode_val_col = [&](uint32_t c) -> int {
+        auto* col = value_columns_[c];
+        for (size_t p = 0; p < np; p++) {
+            auto& info = chunk_pages_[p];
+            auto& cp = chunk_cols_[c][p];
+            cp.data_num = 0;
+            cp.nonnull_count = 0;
+            cp.read_pos = 0;
+            cp.uncompressed_buf = nullptr;
+            uint32_t csz = info.value_compressed_sizes[c];
+            if (csz == 0) continue;
+
+            char stk[4096];
+            char* cbuf;
+            bool heap;
+            int r = E_OK;
+            {
+                heap = csz > sizeof(stk);
+                cbuf = heap ? (char*)common::mem_alloc(csz, 
common::MOD_DEFAULT)
+                            : stk;
+                if (!cbuf) return common::E_OOM;
+                int rlen = 0;
+                r = read_file_->read(info.value_file_offsets[c], cbuf, csz,
+                                     rlen);
+            }
+            if (r != E_OK) {
+                if (heap) common::mem_free(cbuf);
+                return r;
+            }
+
+            char* ub = nullptr;
+            uint32_t us = 0;
+            col->compressor->reset(false);
+            r = col->compressor->uncompress(cbuf, csz, ub, us);
+            if (heap && cbuf != ub) common::mem_free(cbuf);
+            if (r != E_OK || us != info.value_uncompressed_sizes[c]) {
+                if (ub) col->compressor->after_uncompress(ub);
+                return E_TSFILE_CORRUPTED;
+            }
+            cp.uncompressed_buf = ub;
+
+            uint32_t off = 0;
+            uint32_t data_num = SerializationUtil::read_ui32(ub);
+            off += sizeof(uint32_t);
+            cp.data_num = data_num;
+            cp.bitmap.resize((data_num + 7) / 8);
+            for (size_t i = 0; i < cp.bitmap.size(); i++)
+                cp.bitmap[i] = *(ub + off++);
+
+            char* vbuf = ub + off;
+            uint32_t vsize = us - off;
+            col->decoder->reset();
+            common::ByteStream vi;
+            vi.wrap_from(vbuf, vsize);
+
+            auto dt = col->chunk_header.data_type_;
+            if (dt == common::STRING || dt == common::TEXT ||
+                dt == common::BLOB) {
+                cp.nonnull_count = 0;
+                continue;
+            }
+            const uint32_t nmb = 1 << 7;
+            int nn = 0;
+            for (uint32_t i = 0; i < data_num; i++)
+                if (!cp.bitmap.empty() &&
+                    ((cp.bitmap[i / 8] & 0xFF) & (nmb >> (i % 8))) != 0)
+                    nn++;
+            if (nn == 0) {
+                cp.nonnull_count = 0;
+                continue;
+            }
+            uint32_t es = common::get_data_type_size(dt);
+            cp.values.resize((size_t)nn * es);
+            cp.nonnull_count = 0;
+            switch (dt) {
+                case common::BOOLEAN: {
+                    bool* out = reinterpret_cast<bool*>(cp.values.data());
+                    for (int s = 0; s < nn; s++) {
+                        bool v;
+                        if (col->decoder->read_boolean(v, vi) != E_OK) break;
+                        out[cp.nonnull_count++] = v;
+                    }
+                    break;
+                }
+                case common::INT32:
+                case common::DATE:
+                    col->decoder->read_batch_int32(
+                        reinterpret_cast<int32_t*>(cp.values.data()), nn,
+                        cp.nonnull_count, vi);
+                    break;
+                case common::INT64:
+                case common::TIMESTAMP:
+                    col->decoder->read_batch_int64(
+                        reinterpret_cast<int64_t*>(cp.values.data()), nn,
+                        cp.nonnull_count, vi);
+                    break;
+                case common::FLOAT:
+                    col->decoder->read_batch_float(
+                        reinterpret_cast<float*>(cp.values.data()), nn,
+                        cp.nonnull_count, vi);
+                    break;
+                case common::DOUBLE:
+                    col->decoder->read_batch_double(
+                        reinterpret_cast<double*>(cp.values.data()), nn,
+                        cp.nonnull_count, vi);
+                    break;
+                default:
+                    break;
+            }
+        }
+        return E_OK;
+    };
+
+#ifdef ENABLE_THREADS
+    if (decode_pool_ != nullptr) {
+        std::vector<int> col_rets(num_cols, E_OK);
+        for (uint32_t c = 0; c < num_cols; c++)
+            decode_pool_->submit([&, c]() { col_rets[c] = decode_val_col(c); 
});
+        decode_pool_->wait_all();
+        for (uint32_t c = 0; c < num_cols; c++)
+            if (col_rets[c] != E_OK) return col_rets[c];
+        return ret;
+    }
+#endif
+    for (uint32_t c = 0; c < num_cols && IS_SUCC(ret); c++)
+        ret = decode_val_col(c);
+    return ret;
+}
+
+int AlignedChunkReader::scatter_chunk_pages(TsBlock* ret_tsblock,
+                                            RowAppender& row_appender,
+                                            Filter* filter, PageArena* pa) {
+    int ret = E_OK;
+    const uint32_t null_mask_base = 1 << 7;
+    const uint32_t num_cols = value_columns_.size();
+    const size_t np = chunk_pages_.size();
+
+    while ((size_t)chunk_page_cursor_ < np) {
+        auto& td = chunk_times_[chunk_page_cursor_];
+        if (td.cursor >= td.count) {
+            chunk_page_cursor_++;
+            continue;
+        }
+        auto& info = chunk_pages_[chunk_page_cursor_];
+
+        bool need_filter = (info.pass_type == PagePassType::BOUNDARY);
+        bool can_bulk = !need_filter;
+        if (can_bulk) {
+            for (uint32_t c = 0; c < num_cols && can_bulk; c++) {
+                auto& cp = chunk_cols_[c][chunk_page_cursor_];
+                auto dt = value_columns_[c]->chunk_header.data_type_;
+                if (dt == common::STRING || dt == common::TEXT ||
+                    dt == common::BLOB)
+                    can_bulk = false;
+                else if (cp.data_num == 0)
+                    can_bulk = false;
+                else if (cp.nonnull_count != (int)cp.data_num)
+                    can_bulk = false;
+            }
+        }
+
+        if (can_bulk) {
+            while (td.cursor < td.count) {
+                int avail = (int)row_appender.remaining();
+                if (avail <= 0) return E_OVERFLOW;
+                int batch = std::min(td.count - td.cursor, avail);
+
+                
ret_tsblock->get_vector(0)->get_value_data().append_fixed_value(
+                    (const char*)&td.times[td.cursor],
+                    static_cast<uint32_t>(batch) * sizeof(int64_t));
+                for (uint32_t c = 0; c < num_cols; c++) {
+                    auto& cp = chunk_cols_[c][chunk_page_cursor_];
+                    uint32_t es = common::get_data_type_size(
+                        value_columns_[c]->chunk_header.data_type_);
+                    ret_tsblock->get_vector(c + 1)
+                        ->get_value_data()
+                        .append_fixed_value(
+                            cp.values.data() +
+                                static_cast<size_t>(cp.read_pos) * es,
+                            static_cast<uint32_t>(batch) * es);
+                    cp.read_pos += batch;
+                }
+                row_appender.add_rows(static_cast<uint32_t>(batch));
+                td.cursor += batch;
+            }
+        } else {
+            while (td.cursor < td.count) {
+                if (row_appender.remaining() == 0) return E_OVERFLOW;
+                int64_t t = td.times[td.cursor];
+
+                if (need_filter && filter != nullptr &&
+                    !filter->satisfy_start_end_time(t, t)) {
+                    for (uint32_t c = 0; c < num_cols; c++) {
+                        auto& cp = chunk_cols_[c][chunk_page_cursor_];
+                        if (cp.data_num > 0 && !cp.bitmap.empty()) {
+                            int vi = td.cursor;
+                            if ((cp.bitmap[vi / 8] & 0xFF) &
+                                (null_mask_base >> (vi % 8)))
+                                cp.read_pos++;
+                        }
+                    }
+                    td.cursor++;
+                    continue;
+                }
+
+                if (UNLIKELY(!row_appender.add_row())) return E_OVERFLOW;
+                row_appender.append(0, (char*)&t, sizeof(int64_t));
+
+                for (uint32_t c = 0; c < num_cols; c++) {
+                    auto& cp = chunk_cols_[c][chunk_page_cursor_];
+                    int vi = td.cursor;
+                    bool is_null = true;
+                    if (cp.data_num > 0 && !cp.bitmap.empty()) {
+                        is_null = ((cp.bitmap[vi / 8] & 0xFF) &
+                                   (null_mask_base >> (vi % 8))) == 0;
+                    }
+                    if (is_null) {
+                        row_appender.append_null(c + 1);
+                    } else {
+                        uint32_t es = common::get_data_type_size(
+                            value_columns_[c]->chunk_header.data_type_);
+                        row_appender.append(
+                            c + 1,
+                            cp.values.data() +
+                                static_cast<size_t>(cp.read_pos) * es,
+                            es);
+                        cp.read_pos++;
+                    }
+                }
+                td.cursor++;
+            }
+        }
+        chunk_page_cursor_++;
+    }
+    return ret;
+}
+
 }  // end namespace storage
\ No newline at end of file
diff --git a/cpp/src/reader/aligned_chunk_reader.h 
b/cpp/src/reader/aligned_chunk_reader.h
index 91281215..393e30be 100644
--- a/cpp/src/reader/aligned_chunk_reader.h
+++ b/cpp/src/reader/aligned_chunk_reader.h
@@ -28,8 +28,66 @@
 #include "reader/filter/filter.h"
 #include "reader/ichunk_reader.h"
 
+#ifdef ENABLE_THREADS
+namespace common {
+class ThreadPool;
+}
+#endif
+
 namespace storage {
 
+// Page classification for chunk-level parallel decode.
+enum class PagePassType { SKIP, FULL_PASS, BOUNDARY };
+
+// Metadata collected per page during the chunk scan phase.
+struct ChunkPageInfo {
+    PagePassType pass_type = PagePassType::SKIP;
+    int64_t time_file_offset = 0;
+    uint32_t time_compressed_size = 0;
+    uint32_t time_uncompressed_size = 0;
+    std::vector<int64_t> value_file_offsets;
+    std::vector<uint32_t> value_compressed_sizes;
+    std::vector<uint32_t> value_uncompressed_sizes;
+};
+
+// Pre-decoded timestamps for one page (chunk-level decode).
+struct PageTimesDecoded {
+    std::vector<int64_t> times;
+    int count = 0;
+    int cursor = 0;
+};
+
+// Pre-decoded values for one (column, page) pair (chunk-level decode).
+struct ColPageDecoded {
+    std::vector<char> values;
+    std::vector<uint8_t> bitmap;
+    uint32_t data_num = 0;
+    int nonnull_count = 0;
+    int read_pos = 0;
+    char* uncompressed_buf = nullptr;
+};
+
+// Per-value-column state for multi-value AlignedChunkReader.
+struct ValueColumnState {
+    ChunkMeta* chunk_meta = nullptr;
+    ChunkHeader chunk_header;
+    Decoder* decoder = nullptr;
+    Compressor* compressor = nullptr;
+    common::ByteStream in_stream;
+    common::ByteStream in;
+    char* uncompressed_buf = nullptr;
+    int32_t file_data_buf_size = 0;
+    uint32_t chunk_visit_offset = 0;
+    PageHeader cur_page_header;
+    std::vector<uint8_t> notnull_bitmap;
+    int32_t cur_value_index = -1;
+
+    std::vector<char> predecoded_values;
+    int predecoded_count = 0;
+    int predecoded_read_pos = 0;
+    bool predecoded = false;
+};
+
 class AlignedChunkReader : public IChunkReader {
    public:
     AlignedChunkReader()
@@ -64,11 +122,13 @@ class AlignedChunkReader : public IChunkReader {
     ~AlignedChunkReader() override = default;
 
     bool has_more_data() const override {
-        return prev_value_page_not_finish() ||
+        if (multi_value_mode_) {
+            return has_more_data_multi();
+        }
+        return prev_value_page_not_finish() || prev_time_page_not_finish() ||
                (value_chunk_visit_offset_ -
                     value_chunk_header_.serialized_size_ <
                 value_chunk_header_.data_size_) ||
-               prev_time_page_not_finish() ||
                (time_chunk_visit_offset_ - time_chunk_header_.serialized_size_ 
<
                 time_chunk_header_.data_size_);
     }
@@ -76,13 +136,33 @@ class AlignedChunkReader : public IChunkReader {
     int load_by_aligned_meta(ChunkMeta* time_meta,
                              ChunkMeta* value_meta) override;
 
+    // Multi-value: load one time chunk + N value chunks.
+    int load_by_aligned_meta_multi(ChunkMeta* time_meta,
+                                   const std::vector<ChunkMeta*>& value_metas);
+
     int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
                       common::PageArena& pa) override;
-
     int get_next_page(common::TsBlock* tsblock, Filter* oneshoot_filter,
                       common::PageArena& pa, int64_t min_time_hint,
                       int& row_offset, int& row_limit) override;
 
+    uint32_t get_value_column_count() const {
+        return multi_value_mode_ ? value_columns_.size() : 1;
+    }
+
+    ChunkHeader& get_value_chunk_header(uint32_t col) {
+        if (multi_value_mode_ && col < value_columns_.size()) {
+            return value_columns_[col]->chunk_header;
+        }
+        return value_chunk_header_;
+    }
+
+    bool is_multi_value_mode() const { return multi_value_mode_; }
+
+#ifdef ENABLE_THREADS
+    void set_decode_pool(common::ThreadPool* pool) { decode_pool_ = pool; }
+#endif
+
    private:
     bool should_skip_page_by_time(int64_t min_time_hint);
     bool should_skip_page_by_offset(int& row_offset);
@@ -100,7 +180,8 @@ class AlignedChunkReader : public IChunkReader {
                             common::ByteStream& in_stream_,
                             PageHeader& cur_page_header_,
                             uint32_t& chunk_visit_offset,
-                            ChunkHeader& chunk_header);
+                            ChunkHeader& chunk_header,
+                            int32_t* override_buf_size = nullptr);
     int read_from_file_and_rewrap(common::ByteStream& in_stream_,
                                   ChunkMeta*& chunk_meta,
                                   uint32_t& chunk_visit_offset,
@@ -114,6 +195,7 @@ class AlignedChunkReader : public IChunkReader {
                                            Filter* filter,
                                            common::PageArena* pa);
     bool prev_time_page_not_finish() const {
+        if (time_predecoded_) return page_time_cursor_ < page_time_count_;
         return (time_decoder_ && time_decoder_->has_remaining(time_in_)) ||
                time_in_.has_remaining();
     }
@@ -138,52 +220,88 @@ class AlignedChunkReader : public IChunkReader {
                                             common::PageArena& pa,
                                             Filter* filter);
 
+    // ── Multi-value private methods ──────────────────────────────────────
+    bool has_more_data_multi() const;
+    bool prev_any_value_page_not_finish_multi() const;
+    int get_next_page_multi(common::TsBlock* ret_tsblock,
+                            Filter* oneshoot_filter, common::PageArena& pa);
+    int get_next_page_multi_serial(common::TsBlock* ret_tsblock, Filter* 
filter,
+                                   common::PageArena& pa);
+    int skip_cur_page_multi();
+    bool cur_page_statisify_filter_multi(Filter* filter);
+    int decode_cur_value_pages_multi();
+    int decode_cur_value_page_data_for(ValueColumnState& col);
+    int ensure_value_page_loaded(ValueColumnState& col);
+    static int decompress_and_parse_value_page(ValueColumnState& col);
+    void predecode_all_timestamps();
+    int decode_time_value_buf_into_tsblock_multi(common::TsBlock*& ret_tsblock,
+                                                 Filter* filter,
+                                                 common::PageArena* pa);
+    int multi_DECODE_TV_BATCH(common::TsBlock* ret_tsblock,
+                              common::RowAppender& row_appender, Filter* 
filter,
+                              common::PageArena* pa);
+
+    // ── Chunk-level parallel decode methods ─────────────────────────────
+    int scan_chunk_pages(Filter* filter);
+    int decode_chunk_pages();
+    int scatter_chunk_pages(common::TsBlock* tsblock,
+                            common::RowAppender& row_appender, Filter* filter,
+                            common::PageArena* pa);
+    void cleanup_chunk_decode();
+
    private:
     ReadFile* read_file_;
+    // ── Single-value mode fields ─────────────────────────────────────────
     ChunkMeta* time_chunk_meta_;
     ChunkMeta* value_chunk_meta_;
     common::String measurement_name_;
     ChunkHeader time_chunk_header_;
-    // TODO: support reading more than one measurement in AlignedChunkReader.
     ChunkHeader value_chunk_header_;
     PageHeader cur_time_page_header_;
     PageHeader cur_value_page_header_;
 
-    /*
-     * Data reader from file is stored in @in_stream_, and the size
-     * is stored in @file_data_buf_size_. Note, in_stream_.total_size_
-     * is used to limit deserialization, that is why we still have
-     * @file_data_buf_size_.
-     *
-     * Since we may want keep data of current page (and page header
-     * of next page) in memory, we need a byte-size cursor to tell
-     * us which byte we are processing, so we have @chunk_visit_offset_
-     * it refer to position from the start of chunk_header_,
-     * also refer to offset within the chunk (including chunk header).
-     * It advanced by step of a page header or a page tv data.
-     */
-    common::ByteStream time_in_stream_{common::MOD_CHUNK_READER};
-    common::ByteStream value_in_stream_{common::MOD_CHUNK_READER};
+    common::ByteStream time_in_stream_;
+    common::ByteStream value_in_stream_;
     int32_t file_data_time_buf_size_;
     int32_t file_data_value_buf_size_;
     uint32_t time_chunk_visit_offset_;
     uint32_t value_chunk_visit_offset_;
 
-    // Statistic *page_statistic_;
     Compressor* time_compressor_;
     Compressor* value_compressor_;
     Filter* time_filter_;
 
     Decoder* time_decoder_;
     Decoder* value_decoder_;
-    common::ByteStream time_in_{common::MOD_CHUNK_READER};
-    common::ByteStream value_in_{common::MOD_CHUNK_READER};
+    common::ByteStream time_in_;
+    common::ByteStream value_in_;
     char* time_uncompressed_buf_;
     char* value_uncompressed_buf_;
     std::vector<uint8_t> value_page_col_notnull_bitmap_;
     uint32_t value_page_data_num_;
     int32_t cur_value_index;
+
+    // ── Multi-value mode fields ──────────────────────────────────────────
+    bool multi_value_mode_ = false;
+    std::vector<ValueColumnState*> value_columns_;
+
+    // Pre-decoded timestamps for page-level parallel decode.
+    std::vector<int64_t> page_all_times_;
+    int page_time_count_ = 0;
+    int page_time_cursor_ = 0;
+    bool time_predecoded_ = false;
+
+    // ── Chunk-level parallel decode state ────────────────────────────────
+    std::vector<ChunkPageInfo> chunk_pages_;
+    std::vector<PageTimesDecoded> chunk_times_;
+    std::vector<std::vector<ColPageDecoded>> chunk_cols_;
+    int chunk_page_cursor_ = 0;
+    bool chunk_level_active_ = false;
+
+#ifdef ENABLE_THREADS
+    common::ThreadPool* decode_pool_ = nullptr;  // borrowed, not owned
+#endif
 };
 
 }  // end namespace storage
-#endif  // READER_CHUNK_READER_H
+#endif  // READER_CHUNK_ALIGNED_READER_H
diff --git a/cpp/src/reader/filter/filter.h b/cpp/src/reader/filter/filter.h
index f39dddba..1146e463 100644
--- a/cpp/src/reader/filter/filter.h
+++ b/cpp/src/reader/filter/filter.h
@@ -63,6 +63,19 @@ class Filter {
         ASSERT(false);
         return nullptr;
     }
+
+    // Batch time filter: evaluate time filter on an array of timestamps.
+    // Writes true/false into @mask for each element.
+    // Returns the number of elements that passed (mask[i] == true).
+    virtual int satisfy_batch_time(const int64_t* times, int count,
+                                   bool* mask) {
+        int pass = 0;
+        for (int i = 0; i < count; ++i) {
+            mask[i] = satisfy_start_end_time(times[i], times[i]);
+            if (mask[i]) ++pass;
+        }
+        return pass;
+    }
 };
 
 }  // namespace storage
diff --git a/cpp/src/reader/tsfile_series_scan_iterator.cc 
b/cpp/src/reader/tsfile_series_scan_iterator.cc
index c363d0a4..ae93a1ba 100644
--- a/cpp/src/reader/tsfile_series_scan_iterator.cc
+++ b/cpp/src/reader/tsfile_series_scan_iterator.cc
@@ -19,6 +19,11 @@
 
 #include "reader/tsfile_series_scan_iterator.h"
 
+#include "common/global.h"
+#ifdef ENABLE_THREADS
+#include "common/thread_pool.h"
+#endif
+
 using namespace common;
 
 namespace storage {
@@ -34,6 +39,12 @@ void TsFileSeriesScanIterator::destroy() {
         delete tsblock_;
         tsblock_ = nullptr;
     }
+#ifdef ENABLE_THREADS
+    if (decode_pool_ != nullptr) {
+        delete decode_pool_;
+        decode_pool_ = nullptr;
+    }
+#endif
 }
 
 bool TsFileSeriesScanIterator::should_skip_chunk_by_time(
@@ -72,55 +83,73 @@ int TsFileSeriesScanIterator::get_next(TsBlock*& 
ret_tsblock, bool alloc,
             while (true) {
                 if (!has_next_chunk()) {
                     return E_NO_MORE_DATA;
+                } else if (is_multi_value_) {
+                    // Multi-value aligned path
+                    ChunkMeta* time_cm = time_chunk_meta_cursor_.get();
+                    std::vector<ChunkMeta*> value_cms;
+                    value_cms.reserve(value_chunk_meta_cursors_.size());
+                    for (auto& cur : value_chunk_meta_cursors_) {
+                        value_cms.push_back(cur.get());
+                    }
+                    advance_to_next_chunk();
+                    if (filter != nullptr && time_cm->statistic_ != nullptr &&
+                        !filter->satisfy(time_cm->statistic_)) {
+                        continue;
+                    }
+                    if (should_skip_chunk_by_time(time_cm, min_time_hint)) {
+                        continue;
+                    }
+                    chunk_reader_->reset();
+                    auto* acr = 
static_cast<AlignedChunkReader*>(chunk_reader_);
+                    if (RET_FAIL(acr->load_by_aligned_meta_multi(time_cm,
+                                                                 value_cms))) {
+                    }
+                    break;
+                } else if (!is_aligned_) {
+                    ChunkMeta* cm = get_current_chunk_meta();
+                    advance_to_next_chunk();
+                    if (filter != nullptr && cm->statistic_ != nullptr &&
+                        !filter->satisfy(cm->statistic_)) {
+                        continue;
+                    }
+                    if (should_skip_chunk_by_time(cm, min_time_hint)) {
+                        continue;
+                    }
+                    if (should_skip_chunk_by_offset(cm)) {
+                        continue;
+                    }
+                    chunk_reader_->reset();
+                    if (RET_FAIL(chunk_reader_->load_by_meta(cm))) {
+                    }
+                    break;
                 } else {
-                    if (!is_aligned_) {
-                        ChunkMeta* cm = get_current_chunk_meta();
-                        advance_to_next_chunk();
-                        // Skip by time filter.
-                        if (filter != nullptr && cm->statistic_ != nullptr &&
-                            !filter->satisfy(cm->statistic_)) {
-                            continue;
-                        }
-                        // Skip by min_time_hint (merge cursor).
-                        if (should_skip_chunk_by_time(cm, min_time_hint)) {
-                            continue;
-                        }
-                        // Single-path: skip entire chunk by offset using 
count.
-                        if (should_skip_chunk_by_offset(cm)) {
-                            continue;
-                        }
-                        chunk_reader_->reset();
-                        if (RET_FAIL(chunk_reader_->load_by_meta(cm))) {
-                        }
-                        break;
-                    } else {
-                        ChunkMeta* value_cm = value_chunk_meta_cursor_.get();
-                        ChunkMeta* time_cm = time_chunk_meta_cursor_.get();
-                        advance_to_next_chunk();
-                        if (filter != nullptr &&
-                            value_cm->statistic_ != nullptr &&
-                            !filter->satisfy(value_cm->statistic_)) {
-                            continue;
-                        }
-                        if (should_skip_chunk_by_time(value_cm,
-                                                      min_time_hint)) {
-                            continue;
-                        }
-                        if (should_skip_chunk_by_offset(value_cm)) {
-                            continue;
-                        }
-                        chunk_reader_->reset();
-                        if (RET_FAIL(chunk_reader_->load_by_aligned_meta(
-                                time_cm, value_cm))) {
-                        }
-                        break;
+                    ChunkMeta* value_cm = value_chunk_meta_cursor_.get();
+                    ChunkMeta* time_cm = time_chunk_meta_cursor_.get();
+                    advance_to_next_chunk();
+                    ChunkMeta* filter_cm =
+                        (time_cm->statistic_ != nullptr) ? time_cm : value_cm;
+                    if (filter != nullptr && filter_cm->statistic_ != nullptr 
&&
+                        !filter->satisfy(filter_cm->statistic_)) {
+                        continue;
+                    }
+                    if (should_skip_chunk_by_time(filter_cm, min_time_hint)) {
+                        continue;
+                    }
+                    if (should_skip_chunk_by_offset(value_cm)) {
+                        continue;
+                    }
+                    chunk_reader_->reset();
+                    if (RET_FAIL(chunk_reader_->load_by_aligned_meta(
+                            time_cm, value_cm))) {
                     }
+                    break;
                 }
             }
         }
         if (IS_SUCC(ret)) {
             if (alloc && ret_tsblock == nullptr) {
-                ret_tsblock = alloc_tsblock();
+                ret_tsblock =
+                    is_multi_value_ ? alloc_tsblock_multi() : alloc_tsblock();
             }
             ret = chunk_reader_->get_next_page(ret_tsblock, filter, *data_pa_,
                                                min_time_hint, row_offset_,
@@ -147,6 +176,12 @@ void TsFileSeriesScanIterator::revert_tsblock() {
 int TsFileSeriesScanIterator::init_chunk_reader() {
     int ret = E_OK;
     is_aligned_ = itimeseries_index_->get_data_type() == common::VECTOR;
+
+    // Check if this is a multi-value aligned index
+    if (is_aligned_ && itimeseries_index_->get_value_column_count() > 1) {
+        return init_chunk_reader_multi();
+    }
+
     if (!is_aligned_) {
         void* buf =
             common::mem_alloc(sizeof(ChunkReader), common::MOD_CHUNK_READER);
@@ -173,6 +208,63 @@ int TsFileSeriesScanIterator::init_chunk_reader() {
     return ret;
 }
 
+int TsFileSeriesScanIterator::init_chunk_reader_multi() {
+    int ret = E_OK;
+    is_multi_value_ = true;
+
+    void* buf =
+        common::mem_alloc(sizeof(AlignedChunkReader), 
common::MOD_CHUNK_READER);
+    auto* acr = new (buf) AlignedChunkReader;
+    chunk_reader_ = acr;
+
+    uint32_t num_cols = itimeseries_index_->get_value_column_count();
+#ifdef ENABLE_THREADS
+    // Create decode thread pool once at SSI level, shared across all chunks.
+    if (num_cols > 1 && common::g_config_value_.parallel_read_enabled_) {
+        int max_threads = common::g_config_value_.read_thread_count_;
+        int nthreads = std::min((int)num_cols, max_threads);
+        decode_pool_ = new common::ThreadPool(nthreads);
+        acr->set_decode_pool(decode_pool_);
+    }
+#endif
+
+    // Init time cursor
+    time_chunk_meta_cursor_ =
+        itimeseries_index_->get_time_chunk_meta_list()->begin();
+
+    // Init all value cursors
+    value_chunk_meta_cursors_.resize(num_cols);
+    for (uint32_t c = 0; c < num_cols; c++) {
+        value_chunk_meta_cursors_[c] =
+            itimeseries_index_->get_value_chunk_meta_list(c)->begin();
+    }
+
+    // Init chunk reader
+    if (RET_FAIL(
+            acr->init(read_file_, itimeseries_index_->get_measurement_name(),
+                      itimeseries_index_->get_data_type(), time_filter_))) {
+        return ret;
+    }
+
+    // Load first chunk set
+    ChunkMeta* time_cm = time_chunk_meta_cursor_.get();
+    std::vector<ChunkMeta*> value_cms;
+    value_cms.reserve(num_cols);
+    for (uint32_t c = 0; c < num_cols; c++) {
+        value_cms.push_back(value_chunk_meta_cursors_[c].get());
+    }
+
+    if (RET_FAIL(acr->load_by_aligned_meta_multi(time_cm, value_cms))) {
+        return ret;
+    }
+
+    // Advance cursors
+    time_chunk_meta_cursor_++;
+    for (auto& cur : value_chunk_meta_cursors_) cur++;
+
+    return ret;
+}
+
 TsBlock* TsFileSeriesScanIterator::alloc_tsblock() {
     ChunkHeader& ch = chunk_reader_->get_chunk_header();
 
@@ -193,4 +285,29 @@ TsBlock* TsFileSeriesScanIterator::alloc_tsblock() {
     return tsblock_;
 }
 
-}  // end namespace storage
\ No newline at end of file
+TsBlock* TsFileSeriesScanIterator::alloc_tsblock_multi() {
+    auto* acr = static_cast<AlignedChunkReader*>(chunk_reader_);
+
+    // Time column
+    ColumnSchema time_cd("time", common::INT64, common::SNAPPY,
+                         common::TS_2DIFF);
+    tuple_desc_.push_back(time_cd);
+
+    // Value columns
+    uint32_t num_cols = acr->get_value_column_count();
+    for (uint32_t c = 0; c < num_cols; c++) {
+        ChunkHeader& ch = acr->get_value_chunk_header(c);
+        ColumnSchema value_cd(ch.measurement_name_, ch.data_type_,
+                              ch.compression_type_, ch.encoding_type_);
+        tuple_desc_.push_back(value_cd);
+    }
+
+    tsblock_ = new TsBlock(&tuple_desc_);
+    if (E_OK != tsblock_->init()) {
+        delete tsblock_;
+        tsblock_ = nullptr;
+    }
+    return tsblock_;
+}
+
+}  // end namespace storage
diff --git a/cpp/src/reader/tsfile_series_scan_iterator.h 
b/cpp/src/reader/tsfile_series_scan_iterator.h
index 06b35ba1..980f1848 100644
--- a/cpp/src/reader/tsfile_series_scan_iterator.h
+++ b/cpp/src/reader/tsfile_series_scan_iterator.h
@@ -31,6 +31,12 @@
 #include "reader/filter/filter.h"
 #include "utils/util_define.h"
 
+#ifdef ENABLE_THREADS
+namespace common {
+class ThreadPool;
+}
+#endif
+
 namespace storage {
 
 class TsFileIOReader;
@@ -50,6 +56,7 @@ class TsFileSeriesScanIterator {
           tsblock_(nullptr),
           time_filter_(nullptr),
           is_aligned_(false),
+          is_multi_value_(false),
           row_offset_(0),
           row_limit_(-1) {}
     ~TsFileSeriesScanIterator() { destroy(); }
@@ -66,38 +73,40 @@ class TsFileSeriesScanIterator {
     }
     void destroy();
 
-    /**
-     * Set row-level offset and limit for single-path optimization.
-     * When set, the SSI uses chunk/page statistics (count) to skip
-     * entire chunks/pages without decoding.
-     */
     void set_row_range(int offset, int limit) {
         row_offset_ = offset;
         row_limit_ = limit;
     }
 
-    /** Current row offset/limit after chunk/page skip; used to sync with QDS
-     * for single-path. */
     int get_row_offset() const { return row_offset_; }
     int get_row_limit() const { return row_limit_; }
 
-    /*
-     * If oneshoot filter specified, use it instead of this->time_filter_.
-     * @param min_time_hint  When not INT64_MIN, chunks whose end_time
-     *                       < min_time_hint are skipped without loading.
-     *                       Used by merge layer to push down the current
-     *                       merge cursor.
-     */
     int get_next(common::TsBlock*& ret_tsblock, bool alloc_tsblock,
                  Filter* oneshoot_filter = nullptr,
                  int64_t min_time_hint = std::numeric_limits<int64_t>::min());
     void revert_tsblock();
 
+    uint32_t get_value_column_count() const {
+        if (is_multi_value_ && chunk_reader_) {
+            auto* acr = static_cast<AlignedChunkReader*>(chunk_reader_);
+            return acr->get_value_column_count();
+        }
+        return 1;
+    }
+
+    bool is_multi_value() const { return is_multi_value_; }
+
     friend class TsFileIOReader;
 
    private:
     int init_chunk_reader();
+    int init_chunk_reader_multi();
     FORCE_INLINE bool has_next_chunk() const {
+        if (is_multi_value_) {
+            return !value_chunk_meta_cursors_.empty() &&
+                   value_chunk_meta_cursors_[0] !=
+                       itimeseries_index_->get_value_chunk_meta_list(0)->end();
+        }
         if (is_aligned_) {
             return value_chunk_meta_cursor_ !=
                    itimeseries_index_->get_value_chunk_meta_list()->end();
@@ -107,7 +116,10 @@ class TsFileSeriesScanIterator {
         }
     }
     FORCE_INLINE void advance_to_next_chunk() {
-        if (is_aligned_) {
+        if (is_multi_value_) {
+            time_chunk_meta_cursor_++;
+            for (auto& cur : value_chunk_meta_cursors_) cur++;
+        } else if (is_aligned_) {
             time_chunk_meta_cursor_++;
             value_chunk_meta_cursor_++;
         } else {
@@ -120,6 +132,7 @@ class TsFileSeriesScanIterator {
     bool should_skip_chunk_by_time(ChunkMeta* cm, int64_t min_time_hint);
     bool should_skip_chunk_by_offset(ChunkMeta* cm);
     common::TsBlock* alloc_tsblock();
+    common::TsBlock* alloc_tsblock_multi();
 
    private:
     ReadFile* read_file_;
@@ -132,14 +145,22 @@ class TsFileSeriesScanIterator {
     common::SimpleList<ChunkMeta*>::Iterator chunk_meta_cursor_;
     common::SimpleList<ChunkMeta*>::Iterator time_chunk_meta_cursor_;
     common::SimpleList<ChunkMeta*>::Iterator value_chunk_meta_cursor_;
+    // Multi-value: one cursor per value column
+    std::vector<common::SimpleList<ChunkMeta*>::Iterator>
+        value_chunk_meta_cursors_;
     IChunkReader* chunk_reader_;
 
     common::TupleDesc tuple_desc_;
     common::TsBlock* tsblock_;
     Filter* time_filter_;
     bool is_aligned_ = false;
+    bool is_multi_value_ = false;
     int row_offset_;
     int row_limit_;
+#ifdef ENABLE_THREADS
+    common::ThreadPool* decode_pool_ =
+        nullptr;  // owned, for multi-value decode
+#endif
 };
 
 }  // end namespace storage
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc 
b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc
index e115552e..5555ce3c 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_batch_test.cc
@@ -22,6 +22,7 @@
 #include <random>
 #include <vector>
 
+#include "common/global.h"
 #include "common/record.h"
 #include "common/schema.h"
 #include "common/tablet.h"
@@ -35,10 +36,11 @@
 using namespace storage;
 using namespace common;
 
-class TsFileTableReaderBatchTest : public ::testing::Test {
+class TsFileTableReaderBatchTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         libtsfile_init();
+        set_parallel_read_enabled(GetParam());
         file_name_ = std::string("tsfile_reader_table_batch_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -173,7 +175,7 @@ class TsFileTableReaderBatchTest : public ::testing::Test {
     }
 };
 
-TEST_F(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) {
+TEST_P(TsFileTableReaderBatchTest, BatchQueryWithSmallBatchSize) {
     auto table_schema = gen_table_schema();
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -251,7 +253,7 @@ TEST_F(TsFileTableReaderBatchTest, 
BatchQueryWithSmallBatchSize) {
     delete table_schema;
 }
 
-TEST_F(TsFileTableReaderBatchTest, BatchQueryWithLargeBatchSize) {
+TEST_P(TsFileTableReaderBatchTest, BatchQueryWithLargeBatchSize) {
     auto table_schema = gen_table_schema();
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -299,7 +301,7 @@ TEST_F(TsFileTableReaderBatchTest, 
BatchQueryWithLargeBatchSize) {
     delete table_schema;
 }
 
-TEST_F(TsFileTableReaderBatchTest, BatchQueryVerifyDataCorrectness) {
+TEST_P(TsFileTableReaderBatchTest, BatchQueryVerifyDataCorrectness) {
     auto table_schema = gen_table_schema();
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -361,7 +363,7 @@ TEST_F(TsFileTableReaderBatchTest, 
BatchQueryVerifyDataCorrectness) {
     delete table_schema;
 }
 
-TEST_F(TsFileTableReaderBatchTest, PerformanceComparisonSinglePointVsBatch) {
+TEST_P(TsFileTableReaderBatchTest, PerformanceComparisonSinglePointVsBatch) {
     // Create table schema without tags (only fields)
     auto table_schema = gen_table_schema_no_tag();
     auto tsfile_table_writer_ =
@@ -467,3 +469,8 @@ TEST_F(TsFileTableReaderBatchTest, 
PerformanceComparisonSinglePointVsBatch) {
 
     delete table_schema;
 }
+
+INSTANTIATE_TEST_SUITE_P(Serial, TsFileTableReaderBatchTest,
+                         ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TsFileTableReaderBatchTest,
+                         ::testing::Values(true));
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc 
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index 1f63573e..acbb5901 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -25,6 +25,7 @@
 #include "common/tablet.h"
 #include "file/tsfile_io_writer.h"
 #include "file/write_file.h"
+#include "common/global.h"
 #include "reader/filter/tag_filter.h"
 #include "reader/table_result_set.h"
 #include "reader/tsfile_reader.h"
@@ -34,10 +35,11 @@
 using namespace storage;
 using namespace common;
 
-class TsFileTableReaderTest : public ::testing::Test {
+class TsFileTableReaderTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         libtsfile_init();
+        set_parallel_read_enabled(GetParam());
         file_name_ = std::string("tsfile_writer_table_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -207,9 +209,9 @@ class TsFileTableReaderTest : public ::testing::Test {
     }
 };
 
-TEST_F(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); }
+TEST_P(TsFileTableReaderTest, TableModelQuery) { test_table_model_query(); }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
+TEST_P(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
     int prev_config = g_config_value_.page_writer_max_point_num_;
     g_config_value_.page_writer_max_point_num_ = 5;
     test_table_model_query(g_config_value_.page_writer_max_point_num_);
@@ -221,7 +223,7 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage) {
 // time-page-sealed / value-page-not-sealed inconsistency).
 // Use 512 bytes so time seals by size before point count; 128 was too small
 // and could produce misaligned time/value pages on some encodings.
-TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) {
+TEST_P(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) {
     uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_;
     uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_;
     g_config_value_.page_writer_max_point_num_ = 10000;
@@ -231,32 +233,32 @@ TEST_F(TsFileTableReaderTest, 
TableModelQueryMemoryBasedSeal) {
     g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes;
 }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) {
+TEST_P(TsFileTableReaderTest, TableModelQueryOneLargePage) {
     int prev_config = g_config_value_.page_writer_max_point_num_;
     g_config_value_.page_writer_max_point_num_ = 10000;
     test_table_model_query(g_config_value_.page_writer_max_point_num_);
     g_config_value_.page_writer_max_point_num_ = prev_config;
 }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryMultiLargePage) {
+TEST_P(TsFileTableReaderTest, TableModelQueryMultiLargePage) {
     int prev_config = g_config_value_.page_writer_max_point_num_;
     g_config_value_.page_writer_max_point_num_ = 10000;
     test_table_model_query(1000000);
     g_config_value_.page_writer_max_point_num_ = prev_config;
 }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryMultiDevices) {
+TEST_P(TsFileTableReaderTest, TableModelQueryMultiDevices) {
     int prev_config = g_config_value_.page_writer_max_point_num_;
     g_config_value_.page_writer_max_point_num_ = 10000;
     test_table_model_query(g_config_value_.page_writer_max_point_num_, 10);
     g_config_value_.page_writer_max_point_num_ = prev_config;
 }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryWithTimeFilter) {
+TEST_P(TsFileTableReaderTest, TableModelQueryWithTimeFilter) {
     test_table_model_query(10, 1, 2);
 }
 
-TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
+TEST_P(TsFileTableReaderTest, TableModelResultMetadata) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -293,7 +295,7 @@ TEST_F(TsFileTableReaderTest, TableModelResultMetadata) {
     delete table_schema;
 }
 
-TEST_F(TsFileTableReaderTest, TableModelGetSchema) {
+TEST_P(TsFileTableReaderTest, TableModelGetSchema) {
     auto tmp_table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, tmp_table_schema);
@@ -353,7 +355,7 @@ TEST_F(TsFileTableReaderTest, TableModelGetSchema) {
     delete tmp_table_schema;
 }
 
-TEST_F(TsFileTableReaderTest, TableModelQueryWithMultiTabletsMultiFlush) {
+TEST_P(TsFileTableReaderTest, TableModelQueryWithMultiTabletsMultiFlush) {
     auto tmp_table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, tmp_table_schema);
@@ -408,7 +410,7 @@ TEST_F(TsFileTableReaderTest, 
TableModelQueryWithMultiTabletsMultiFlush) {
     delete tmp_table_schema;
 }
 
-TEST_F(TsFileTableReaderTest, ReadNonExistColumn) {
+TEST_P(TsFileTableReaderTest, ReadNonExistColumn) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     measurement_schemas.resize(2);
@@ -444,7 +446,7 @@ TEST_F(TsFileTableReaderTest, ReadNonExistColumn) {
     delete table_schema;
 }
 
-TEST_F(TsFileTableReaderTest, TestDecoder) {
+TEST_P(TsFileTableReaderTest, TestDecoder) {
     std::vector<ColumnSchema> column_schema;
     column_schema.emplace_back("value1", TSDataType::INT32);
     auto* schema = new TableSchema("test_table", column_schema);
@@ -574,7 +576,7 @@ void test_null_table(WriteFile* write_file, int max_rows,
     reader.close();
 }
 
-TEST_F(TsFileTableReaderTest, TestNullInTable) {
+TEST_P(TsFileTableReaderTest, TestNullInTable) {
     // 1. In some rows, all FIELD columns are empty.
     test_null_table(
         &write_file_, 10,
@@ -612,7 +614,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable) {
         });
 }
 
-TEST_F(TsFileTableReaderTest, TestNullInTable2) {
+TEST_P(TsFileTableReaderTest, TestNullInTable2) {
     // 2. In some rows, the TAG column is entirely empty,
     // and in some rows, all FIELD columns are empty.
     test_null_table(
@@ -651,7 +653,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable2) {
         });
 }
 
-TEST_F(TsFileTableReaderTest, TestNullInTable3) {
+TEST_P(TsFileTableReaderTest, TestNullInTable3) {
     // 3. In some rows, the TAG and Field columns are entirely empty,
     test_null_table(
         &write_file_, 10,
@@ -688,7 +690,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable3) {
         });
 }
 
-TEST_F(TsFileTableReaderTest, TestNullInTable4) {
+TEST_P(TsFileTableReaderTest, TestNullInTable4) {
     // 3. In some rows, the TAG and Field columns are entirely empty,
     test_null_table(
         &write_file_, 1000000,
@@ -723,7 +725,7 @@ TEST_F(TsFileTableReaderTest, TestNullInTable4) {
         });
 }
 
-TEST_F(TsFileTableReaderTest, TestTimeColumnReader) {
+TEST_P(TsFileTableReaderTest, TestTimeColumnReader) {
     std::vector<common::ColumnSchema> column_schemas;
     column_schemas.emplace_back("s0", TSDataType::INT64,
                                 CompressionType::UNCOMPRESSED,
@@ -808,7 +810,7 @@ TEST_F(TsFileTableReaderTest, TestTimeColumnReader) {
 // When a TsBlock is full (block_size=1024) and the next row to decode is a
 // NULL value in aligned data, the old code consumed the timestamp before
 // checking add_row(), silently losing that row on E_OVERFLOW.
-TEST_F(TsFileTableReaderTest, AlignedNullAtBlockBoundaryNoRowLoss) {
+TEST_P(TsFileTableReaderTest, AlignedNullAtBlockBoundaryNoRowLoss) {
     // block_size in RETURN_ROW mode is 1024.
     const int32_t block_size = 1024;
     // Write enough rows so that overflow happens multiple times,
@@ -885,4 +887,9 @@ TEST_F(TsFileTableReaderTest, 
AlignedNullAtBlockBoundaryNoRowLoss) {
     ASSERT_EQ(nullable_rows, total_rows);
 
     ASSERT_EQ(reader.close(), common::E_OK);
-}
\ No newline at end of file
+}
+
+INSTANTIATE_TEST_SUITE_P(Serial, TsFileTableReaderTest,
+                         ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TsFileTableReaderTest,
+                         ::testing::Values(true));
\ No newline at end of file
diff --git a/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc 
b/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
index 026f75b2..abeedffc 100644
--- a/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
+++ b/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
@@ -35,10 +35,11 @@
 using namespace storage;
 using namespace common;
 
-class TableQueryByRowTest : public ::testing::Test {
+class TableQueryByRowTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         libtsfile_init();
+        set_parallel_read_enabled(GetParam());
         file_name_ = std::string("table_query_by_row_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -346,7 +347,7 @@ class TableQueryByRowTest : public ::testing::Test {
 };
 
 // No offset or limit: queryByRow(0, -1) returns the same rows as full query.
-TEST_F(TableQueryByRowTest, NoOffsetNoLimit) {
+TEST_P(TableQueryByRowTest, NoOffsetNoLimit) {
     int num_rows = 50;
     write_single_device_file(num_rows);
 
@@ -357,7 +358,7 @@ TEST_F(TableQueryByRowTest, NoOffsetNoLimit) {
 }
 
 // Offset only: skip first N rows, return the rest; limit=-1 means no cap.
-TEST_F(TableQueryByRowTest, OffsetOnly) {
+TEST_P(TableQueryByRowTest, OffsetOnly) {
     int num_rows = 50;
     write_single_device_file(num_rows);
 
@@ -371,7 +372,7 @@ TEST_F(TableQueryByRowTest, OffsetOnly) {
 }
 
 // Limit only: return at most M rows from the start; offset=0.
-TEST_F(TableQueryByRowTest, LimitOnly) {
+TEST_P(TableQueryByRowTest, LimitOnly) {
     int num_rows = 50;
     write_single_device_file(num_rows);
 
@@ -385,7 +386,7 @@ TEST_F(TableQueryByRowTest, LimitOnly) {
 }
 
 // Both offset and limit: skip first N rows, then return at most M rows.
-TEST_F(TableQueryByRowTest, OffsetAndLimit) {
+TEST_P(TableQueryByRowTest, OffsetAndLimit) {
     int num_rows = 100;
     write_single_device_file(num_rows);
 
@@ -400,7 +401,7 @@ TEST_F(TableQueryByRowTest, OffsetAndLimit) {
 }
 
 // Offset beyond total row count: returns empty result.
-TEST_F(TableQueryByRowTest, OffsetBeyondData) {
+TEST_P(TableQueryByRowTest, OffsetBeyondData) {
     int num_rows = 30;
     write_single_device_file(num_rows);
 
@@ -409,7 +410,7 @@ TEST_F(TableQueryByRowTest, OffsetBeyondData) {
 }
 
 // Limit zero: returns no rows (no data read).
-TEST_F(TableQueryByRowTest, LimitZero) {
+TEST_P(TableQueryByRowTest, LimitZero) {
     int num_rows = 30;
     write_single_device_file(num_rows);
 
@@ -419,7 +420,7 @@ TEST_F(TableQueryByRowTest, LimitZero) {
 
 // Offset + limit exceeds total: returns all rows after offset (less than
 // limit).
-TEST_F(TableQueryByRowTest, OffsetPlusLimitExceedsTotal) {
+TEST_P(TableQueryByRowTest, OffsetPlusLimitExceedsTotal) {
     int num_rows = 50;
     write_single_device_file(num_rows);
 
@@ -434,7 +435,7 @@ TEST_F(TableQueryByRowTest, OffsetPlusLimitExceedsTotal) {
 }
 
 // Multi-device, no offset/limit: queryByRow(0, -1) matches full query order.
-TEST_F(TableQueryByRowTest, MultiDeviceNoOffset) {
+TEST_P(TableQueryByRowTest, MultiDeviceNoOffset) {
     int rows_per_device = 20;
     int device_count = 3;
     write_multi_device_file(rows_per_device, device_count);
@@ -446,7 +447,7 @@ TEST_F(TableQueryByRowTest, MultiDeviceNoOffset) {
 }
 
 // Multi-device, offset within first device: skip applies to global row order.
-TEST_F(TableQueryByRowTest, MultiDeviceOffsetWithinFirstDevice) {
+TEST_P(TableQueryByRowTest, MultiDeviceOffsetWithinFirstDevice) {
     int rows_per_device = 20;
     int device_count = 3;
     write_multi_device_file(rows_per_device, device_count);
@@ -462,7 +463,7 @@ TEST_F(TableQueryByRowTest, 
MultiDeviceOffsetWithinFirstDevice) {
 
 // Multi-device, offset skips entire first device(s): verifies device-level
 // skip.
-TEST_F(TableQueryByRowTest, MultiDeviceOffsetSkipsEntireDevice) {
+TEST_P(TableQueryByRowTest, MultiDeviceOffsetSkipsEntireDevice) {
     int rows_per_device = 20;
     int device_count = 3;
     write_multi_device_file(rows_per_device, device_count);
@@ -479,7 +480,7 @@ TEST_F(TableQueryByRowTest, 
MultiDeviceOffsetSkipsEntireDevice) {
 
 // Multi-device, offset and limit span device boundary: correct cross-device
 // slice.
-TEST_F(TableQueryByRowTest, MultiDeviceOffsetSpansDeviceBoundary) {
+TEST_P(TableQueryByRowTest, MultiDeviceOffsetSpansDeviceBoundary) {
     int rows_per_device = 20;
     int device_count = 3;
     write_multi_device_file(rows_per_device, device_count);
@@ -495,7 +496,7 @@ TEST_F(TableQueryByRowTest, 
MultiDeviceOffsetSpansDeviceBoundary) {
 }
 
 // Multi-device, offset beyond all data: returns empty.
-TEST_F(TableQueryByRowTest, MultiDeviceOffsetSkipsAllDevices) {
+TEST_P(TableQueryByRowTest, MultiDeviceOffsetSkipsAllDevices) {
     int rows_per_device = 10;
     int device_count = 3;
     write_multi_device_file(rows_per_device, device_count);
@@ -506,7 +507,7 @@ TEST_F(TableQueryByRowTest, 
MultiDeviceOffsetSkipsAllDevices) {
 
 // Single device: queryByRow(offset, limit) equals full query + manual
 // skip/limit in app.
-TEST_F(TableQueryByRowTest, EquivalenceWithManualSkip) {
+TEST_P(TableQueryByRowTest, EquivalenceWithManualSkip) {
     int num_rows = 200;
     write_single_device_file(num_rows);
 
@@ -542,7 +543,7 @@ TEST_F(TableQueryByRowTest, EquivalenceWithManualSkip) {
 
 // Multi-device: queryByRow(offset, limit) equals full query + manual 
skip/limit
 // in app.
-TEST_F(TableQueryByRowTest, MultiDeviceEquivalenceWithManualSkip) {
+TEST_P(TableQueryByRowTest, MultiDeviceEquivalenceWithManualSkip) {
     int rows_per_device = 30;
     int device_count = 4;
     write_multi_device_file(rows_per_device, device_count);
@@ -578,7 +579,7 @@ TEST_F(TableQueryByRowTest, 
MultiDeviceEquivalenceWithManualSkip) {
 }
 
 // Large single-device dataset: offset and limit correctness with many rows.
-TEST_F(TableQueryByRowTest, LargeDatasetOffsetLimit) {
+TEST_P(TableQueryByRowTest, LargeDatasetOffsetLimit) {
     int num_rows = 5000;
     write_single_device_file(num_rows);
 
@@ -592,7 +593,7 @@ TEST_F(TableQueryByRowTest, LargeDatasetOffsetLimit) {
     }
 }
 
-TEST_F(TableQueryByRowTest, DenseAlignedNullsMustUseTimeRowCount) {
+TEST_P(TableQueryByRowTest, DenseAlignedNullsMustUseTimeRowCount) {
     const int rows_per_batch = 200;
     const int num_batches = 4;
     write_single_device_sparse_multi_chunk_with_equal_missing(
@@ -615,7 +616,7 @@ TEST_F(TableQueryByRowTest, 
DenseAlignedNullsMustUseTimeRowCount) {
 // Chunks/Pages by ChunkMeta/PageHeader count without decoding. Multi-chunk
 // file is produced by small memory_threshold and multiple flush; offset/limit
 // are chosen so that at least one Chunk is skipped and result is correct.
-TEST_F(TableQueryByRowTest, DenseSingleDeviceSsiLevelPushdown) {
+TEST_P(TableQueryByRowTest, DenseSingleDeviceSsiLevelPushdown) {
     const int rows_per_batch = 300;
     const int num_batches = 4;
     write_single_device_dense_multi_chunk(rows_per_batch, num_batches,
@@ -653,7 +654,7 @@ TEST_F(TableQueryByRowTest, 
DenseSingleDeviceSsiLevelPushdown) {
 // Pushdown is faster than full query + manual next: queryByRow(offset, limit)
 // skips at device/SSI/Chunk level; old query then manual next decodes every
 // row. Timing tolerance 20% to allow measurement noise.
-TEST_F(TableQueryByRowTest, DISABLED_QueryByRowFasterThanManualNext) {
+TEST_P(TableQueryByRowTest, DISABLED_QueryByRowFasterThanManualNext) {
     const int num_rows = 8000;
     const int offset = 3000;
     const int limit = 1000;
@@ -728,7 +729,7 @@ TEST_F(TableQueryByRowTest, 
DISABLED_QueryByRowFasterThanManualNext) {
 
 // queryByRow with tag filter: only rows matching the tag predicate are
 // returned.
-TEST_F(TableQueryByRowTest, TagFilterEq) {
+TEST_P(TableQueryByRowTest, TagFilterEq) {
     int rows_per_device = 20;
     int device_count = 3;
     write_multi_device_file(rows_per_device, device_count);
@@ -769,3 +770,8 @@ TEST_F(TableQueryByRowTest, TagFilterEq) {
         EXPECT_EQ(filtered_s1[t], static_cast<int64_t>(1 * 1000 + t));
     }
 }
+
+INSTANTIATE_TEST_SUITE_P(Serial, TableQueryByRowTest,
+                         ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TableQueryByRowTest,
+                         ::testing::Values(true));
diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc 
b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
index 8181b613..9c1bc765 100644
--- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc
@@ -24,6 +24,7 @@
 #include "common/schema.h"
 #include "common/tablet.h"
 #include "file/write_file.h"
+#include "common/global.h"
 #include "reader/result_set.h"
 #include "reader/tsfile_reader.h"
 #include "reader/tsfile_tree_reader.h"
@@ -33,10 +34,11 @@
 using namespace storage;
 using namespace common;
 
-class TsFileTreeReaderTest : public ::testing::Test {
+class TsFileTreeReaderTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         libtsfile_init();
+        set_parallel_read_enabled(GetParam());
         file_name_ = std::string("tsfile_writer_tree_reader_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -73,7 +75,7 @@ class TsFileTreeReaderTest : public ::testing::Test {
     }
 };
 
-TEST_F(TsFileTreeReaderTest, BasicTest) {
+TEST_P(TsFileTreeReaderTest, BasicTest) {
     TsFileTreeWriter writer(&write_file_);
     std::string device_id = "test_device";
     std::string measurement_id = "test_measurement";
@@ -107,7 +109,7 @@ TEST_F(TsFileTreeReaderTest, BasicTest) {
     reader.close();
 }
 
-TEST_F(TsFileTreeReaderTest, ReadTreeByTable) {
+TEST_P(TsFileTreeReaderTest, ReadTreeByTable) {
     TsFileTreeWriter writer(&write_file_);
     std::vector<std::string> device_ids = {"root.db1.t1", "root.db2.t1",
                                            "root.db3.t2.t3", "root.db3.t3",
@@ -185,7 +187,7 @@ TEST_F(TsFileTreeReaderTest, ReadTreeByTable) {
     reader.close();
 }
 
-TEST_F(TsFileTreeReaderTest, ReadTreeByTableIrrergular) {
+TEST_P(TsFileTreeReaderTest, ReadTreeByTableIrrergular) {
     TsFileTreeWriter writer(&write_file_);
     std::vector<std::string> device_ids = {"root.db1.t1",
                                            "root.db2.t1",
@@ -288,7 +290,7 @@ TEST_F(TsFileTreeReaderTest, ReadTreeByTableIrrergular) {
     reader.close();
 }
 
-TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
+TEST_P(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
     TsFileTreeWriter writer(&write_file_);
     std::vector<std::string> device_ids = {"device_1", "device_2", "device_3"};
     std::vector<std::string> measurement_ids = {"temperature", "humidity",
@@ -434,7 +436,7 @@ TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) {
 //    "root" instead of "root.sensors".
 // 2. load_device_index_entry used operator[] on the table map which inserted a
 //    null entry, then asserted on it.
-TEST_F(TsFileTreeReaderTest, QueryTableOnTreeDeepDevicePath) {
+TEST_P(TsFileTreeReaderTest, QueryTableOnTreeDeepDevicePath) {
     TsFileTreeWriter writer(&write_file_);
     // Device paths with 3 dot-segments: table_name="root.sensors", device="TH"
     std::string device_id = "root.sensors.TH";
@@ -478,7 +480,7 @@ TEST_F(TsFileTreeReaderTest, 
QueryTableOnTreeDeepDevicePath) {
 // up the table node, which silently inserted a null entry and then asserted.
 // After the fix it uses find() and returns E_DEVICE_NOT_EXIST gracefully.
 // This is triggered when querying a measurement that no device in the file 
has.
-TEST_F(TsFileTreeReaderTest, QueryTableOnTreeMissingMeasurement) {
+TEST_P(TsFileTreeReaderTest, QueryTableOnTreeMissingMeasurement) {
     // Use the same multi-device setup as ReadTreeByTable to ensure a valid
     // file.
     TsFileTreeWriter writer(&write_file_);
@@ -509,3 +511,8 @@ TEST_F(TsFileTreeReaderTest, 
QueryTableOnTreeMissingMeasurement) {
     }
     reader.close();
 }
+
+INSTANTIATE_TEST_SUITE_P(Serial, TsFileTreeReaderTest,
+                         ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TsFileTreeReaderTest,
+                         ::testing::Values(true));
diff --git a/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc 
b/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
index 74845e44..1aeac11d 100644
--- a/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
@@ -32,10 +32,11 @@
 using namespace storage;
 using namespace common;
 
-class TreeQueryByRowTest : public ::testing::Test {
+class TreeQueryByRowTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         libtsfile_init();
+        set_parallel_read_enabled(GetParam());
         file_name_ = std::string("tree_query_by_row_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -110,7 +111,7 @@ class TreeQueryByRowTest : public ::testing::Test {
 };
 
 // Basic test: queryByRow returns correct total count with no offset/limit.
-TEST_F(TreeQueryByRowTest, NoOffsetNoLimit) {
+TEST_P(TreeQueryByRowTest, NoOffsetNoLimit) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 10;
@@ -134,7 +135,7 @@ TEST_F(TreeQueryByRowTest, NoOffsetNoLimit) {
 }
 
 // Test: offset skips leading rows.
-TEST_F(TreeQueryByRowTest, OffsetOnly) {
+TEST_P(TreeQueryByRowTest, OffsetOnly) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 10;
@@ -160,7 +161,7 @@ TEST_F(TreeQueryByRowTest, OffsetOnly) {
 }
 
 // Test: limit caps the number of rows returned.
-TEST_F(TreeQueryByRowTest, LimitOnly) {
+TEST_P(TreeQueryByRowTest, LimitOnly) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 10;
@@ -185,7 +186,7 @@ TEST_F(TreeQueryByRowTest, LimitOnly) {
 }
 
 // Test: offset + limit combined.
-TEST_F(TreeQueryByRowTest, OffsetAndLimit) {
+TEST_P(TreeQueryByRowTest, OffsetAndLimit) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 20;
@@ -212,7 +213,7 @@ TEST_F(TreeQueryByRowTest, OffsetAndLimit) {
 }
 
 // Test: offset exceeds total rows → empty result.
-TEST_F(TreeQueryByRowTest, OffsetExceedsTotalRows) {
+TEST_P(TreeQueryByRowTest, OffsetExceedsTotalRows) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 5;
@@ -233,7 +234,7 @@ TEST_F(TreeQueryByRowTest, OffsetExceedsTotalRows) {
 }
 
 // Test: limit=0 → empty result.
-TEST_F(TreeQueryByRowTest, LimitZero) {
+TEST_P(TreeQueryByRowTest, LimitZero) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 10;
@@ -255,7 +256,7 @@ TEST_F(TreeQueryByRowTest, LimitZero) {
 
 // Test: multi-path (multiple devices, same measurement) merged by time.
 // All devices write at same timestamps, so merged row count = num_rows.
-TEST_F(TreeQueryByRowTest, MultiPathMerge) {
+TEST_P(TreeQueryByRowTest, MultiPathMerge) {
     std::vector<std::string> devices = {"d1", "d2", "d3"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 10;
@@ -281,7 +282,7 @@ TEST_F(TreeQueryByRowTest, MultiPathMerge) {
 }
 
 // Test: multi-path with offset and limit.
-TEST_F(TreeQueryByRowTest, MultiPathOffsetLimit) {
+TEST_P(TreeQueryByRowTest, MultiPathOffsetLimit) {
     std::vector<std::string> devices = {"d1", "d2"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 20;
@@ -308,7 +309,7 @@ TEST_F(TreeQueryByRowTest, MultiPathOffsetLimit) {
 }
 
 // Test: single path with multiple measurements.
-TEST_F(TreeQueryByRowTest, SingleDeviceMultipleMeasurements) {
+TEST_P(TreeQueryByRowTest, SingleDeviceMultipleMeasurements) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1", "s2", "s3"};
     int num_rows = 15;
@@ -364,7 +365,7 @@ TEST_F(TreeQueryByRowTest, 
SingleDeviceMultipleMeasurements) {
 }
 
 // Test: limit larger than available rows → returns all rows.
-TEST_F(TreeQueryByRowTest, LimitLargerThanAvailable) {
+TEST_P(TreeQueryByRowTest, LimitLargerThanAvailable) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 5;
@@ -385,7 +386,7 @@ TEST_F(TreeQueryByRowTest, LimitLargerThanAvailable) {
 }
 
 // Test: larger dataset to exercise chunk/page boundaries.
-TEST_F(TreeQueryByRowTest, LargeDatasetOffsetLimit) {
+TEST_P(TreeQueryByRowTest, LargeDatasetOffsetLimit) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     int num_rows = 5000;
@@ -412,7 +413,7 @@ TEST_F(TreeQueryByRowTest, LargeDatasetOffsetLimit) {
 }
 
 // Test: multi-device multi-measurement with interleaved timestamps.
-TEST_F(TreeQueryByRowTest, MultiDeviceMultiMeasurementInterleaved) {
+TEST_P(TreeQueryByRowTest, MultiDeviceMultiMeasurementInterleaved) {
     // Device d1 has timestamps 0,2,4,6,...
     // Device d2 has timestamps 1,3,5,7,...
     // After merge, rows are 0,1,2,3,...
@@ -518,7 +519,7 @@ static void write_single_path_multi_chunk(TsFileTreeWriter& 
writer,
 //   Chunk3 [t=60..89, count=30]
 
 // offset exactly equals one chunk: Chunk1 is skipped wholesale.
-TEST_F(TreeQueryByRowTest, SinglePath_SkipChunk_OffsetEqualsOneChunk) {
+TEST_P(TreeQueryByRowTest, SinglePath_SkipChunk_OffsetEqualsOneChunk) {
     PageGuard pg(10);  // 10 pts/page -> 3 pages/chunk
     {
         TsFileTreeWriter writer(&write_file_);
@@ -543,7 +544,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_SkipChunk_OffsetEqualsOneChunk) {
 }
 
 // offset equals two chunk counts: both Chunk1 and Chunk2 are skipped.
-TEST_F(TreeQueryByRowTest, SinglePath_SkipChunk_OffsetEqualsTwoChunks) {
+TEST_P(TreeQueryByRowTest, SinglePath_SkipChunk_OffsetEqualsTwoChunks) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -569,7 +570,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_SkipChunk_OffsetEqualsTwoChunks) {
 
 // offset = chunk_count - 1: Chunk1 cannot be skipped (count=30 > 29);
 // 29 rows consumed inside Chunk1, then result spans into Chunk2.
-TEST_F(TreeQueryByRowTest, SinglePath_OffsetJustBeforeChunkBoundary) {
+TEST_P(TreeQueryByRowTest, SinglePath_OffsetJustBeforeChunkBoundary) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -594,7 +595,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_OffsetJustBeforeChunkBoundary) {
 
 // offset = chunk_count + 1: Chunk1 is skipped; 1 row consumed inside
 // Chunk2; result starts at t=31.
-TEST_F(TreeQueryByRowTest, SinglePath_OffsetJustAfterChunkBoundary) {
+TEST_P(TreeQueryByRowTest, SinglePath_OffsetJustAfterChunkBoundary) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -630,7 +631,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_OffsetJustAfterChunkBoundary) {
 //   Page3 [t=20..29, count=10]
 
 // offset exactly equals one page: Page1 is skipped wholesale.
-TEST_F(TreeQueryByRowTest, SinglePath_SkipPage_OffsetEqualsOnePage) {
+TEST_P(TreeQueryByRowTest, SinglePath_SkipPage_OffsetEqualsOnePage) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -655,7 +656,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_SkipPage_OffsetEqualsOnePage) {
 }
 
 // offset equals two page counts: Page1 + Page2 are both skipped.
-TEST_F(TreeQueryByRowTest, SinglePath_SkipPage_OffsetEqualsTwoPages) {
+TEST_P(TreeQueryByRowTest, SinglePath_SkipPage_OffsetEqualsTwoPages) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -681,7 +682,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_SkipPage_OffsetEqualsTwoPages) {
 
 // offset = page_count - 1: Page1 cannot be skipped (count=10 > 9);
 // 9 rows consumed row-by-row inside Page1, then result spans Page2.
-TEST_F(TreeQueryByRowTest, SinglePath_SkipPage_OffsetJustBeforePageBoundary) {
+TEST_P(TreeQueryByRowTest, SinglePath_SkipPage_OffsetJustBeforePageBoundary) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -710,7 +711,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_SkipPage_OffsetJustBeforePageBoundary) {
 
 // limit < page_size: stop inside the first page.
 // row_limit_ reaches 0 mid-page; subsequent pages/chunks must not load.
-TEST_F(TreeQueryByRowTest, SinglePath_LimitStopsMidPage) {
+TEST_P(TreeQueryByRowTest, SinglePath_LimitStopsMidPage) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -734,7 +735,7 @@ TEST_F(TreeQueryByRowTest, SinglePath_LimitStopsMidPage) {
 }
 
 // limit = exactly one page: stop at the page boundary.
-TEST_F(TreeQueryByRowTest, SinglePath_LimitEqualsOnePage) {
+TEST_P(TreeQueryByRowTest, SinglePath_LimitEqualsOnePage) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -758,7 +759,7 @@ TEST_F(TreeQueryByRowTest, SinglePath_LimitEqualsOnePage) {
 }
 
 // limit = exactly one chunk (3 pages): stop at the chunk boundary.
-TEST_F(TreeQueryByRowTest, SinglePath_LimitEqualsOneChunk) {
+TEST_P(TreeQueryByRowTest, SinglePath_LimitEqualsOneChunk) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -783,7 +784,7 @@ TEST_F(TreeQueryByRowTest, SinglePath_LimitEqualsOneChunk) {
 }
 
 // offset skips 2 chunks; limit stops mid-page inside the 3rd chunk.
-TEST_F(TreeQueryByRowTest, SinglePath_SkipTwoChunksThenLimitMidPage) {
+TEST_P(TreeQueryByRowTest, SinglePath_SkipTwoChunksThenLimitMidPage) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -816,7 +817,7 @@ TEST_F(TreeQueryByRowTest, 
SinglePath_SkipTwoChunksThenLimitMidPage) {
 // correctness while exercising the multi-chunk merge path including
 // get_next_tsblock_with_hint (min_time_hint forwarding).
 
-TEST_F(TreeQueryByRowTest, MultiPath_OffsetLimitWithMultipleChunks) {
+TEST_P(TreeQueryByRowTest, MultiPath_OffsetLimitWithMultipleChunks) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -866,7 +867,7 @@ TEST_F(TreeQueryByRowTest, 
MultiPath_OffsetLimitWithMultipleChunks) {
 
 // Two devices with interleaved timestamps (d1=even, d2=odd), multiple
 // chunks each.  Merged stream is t=0,1,2,...,79 (80 rows).
-TEST_F(TreeQueryByRowTest, MultiPath_InterleavedTimestamps_MultipleChunks) {
+TEST_P(TreeQueryByRowTest, MultiPath_InterleavedTimestamps_MultipleChunks) {
     PageGuard pg(5);
     {
         std::string d1 = "d1", d2 = "d2";
@@ -913,7 +914,7 @@ TEST_F(TreeQueryByRowTest, 
MultiPath_InterleavedTimestamps_MultipleChunks) {
 }
 
 // Three devices, offset at exact chunk boundary, limit cuts mid-chunk.
-TEST_F(TreeQueryByRowTest, MultiPath_OffsetAtMergedChunkBoundary) {
+TEST_P(TreeQueryByRowTest, MultiPath_OffsetAtMergedChunkBoundary) {
     PageGuard pg(10);
     {
         TsFileTreeWriter writer(&write_file_);
@@ -977,7 +978,7 @@ TEST_F(TreeQueryByRowTest, 
MultiPath_OffsetAtMergedChunkBoundary) {
 //   - Total merged rows = 100 (all from d1, t=0..99).
 //   - d2 is non-null only at t=50..59 (the 10 rows from chunk-d2-1).
 //   - No out-of-order or duplicate timestamps.
-TEST_F(TreeQueryByRowTest, MultiPath_TimeHint_SkipsStaleChunk) {
+TEST_P(TreeQueryByRowTest, MultiPath_TimeHint_SkipsStaleChunk) {
     {
         TsFileTreeWriter writer(&write_file_);
         std::string d1 = "d1", d2 = "d2";
@@ -1051,7 +1052,7 @@ TEST_F(TreeQueryByRowTest, 
MultiPath_TimeHint_SkipsStaleChunk) {
 // Same stale-chunk scenario but with offset/limit applied on top.
 // offset=60, limit=10 -> rows t=60..69; d2 is null for all of them.
 // Verifies that offset counting is not confused by the skipped stale chunk.
-TEST_F(TreeQueryByRowTest, MultiPath_TimeHint_SkipsStaleChunk_WithOffset) {
+TEST_P(TreeQueryByRowTest, MultiPath_TimeHint_SkipsStaleChunk_WithOffset) {
     {
         TsFileTreeWriter writer(&write_file_);
         std::string d1 = "d1", d2 = "d2";
@@ -1103,7 +1104,7 @@ TEST_F(TreeQueryByRowTest, 
MultiPath_TimeHint_SkipsStaleChunk_WithOffset) {
 // Pushdown is faster than full query + manual next: queryByRow(offset, limit)
 // skips at Chunk/Page level; old query then manual next decodes every row.
 // Timing tolerance 20% to allow measurement noise.
-TEST_F(TreeQueryByRowTest, DISABLED_QueryByRowFasterThanManualNext) {
+TEST_P(TreeQueryByRowTest, DISABLED_QueryByRowFasterThanManualNext) {
     std::vector<std::string> devices = {"d1"};
     std::vector<std::string> measurements = {"s1"};
     const int num_rows = 8000;
@@ -1177,3 +1178,8 @@ TEST_F(TreeQueryByRowTest, 
DISABLED_QueryByRowFasterThanManualNext) {
            "(min_by_row="
         << min_by_row << " ms, min_manual=" << min_manual << " ms)";
 }
+
+INSTANTIATE_TEST_SUITE_P(Serial, TreeQueryByRowTest,
+                         ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TreeQueryByRowTest,
+                         ::testing::Values(true));
diff --git a/cpp/test/reader/tsfile_reader_test.cc 
b/cpp/test/reader/tsfile_reader_test.cc
index 54127e07..88febc0f 100644
--- a/cpp/test/reader/tsfile_reader_test.cc
+++ b/cpp/test/reader/tsfile_reader_test.cc
@@ -29,17 +29,19 @@
 #include "common/tablet.h"
 #include "file/tsfile_io_writer.h"
 #include "file/write_file.h"
+#include "common/global.h"
 #include "reader/qds_without_timegenerator.h"
 #include "writer/tsfile_writer.h"
 
 using namespace storage;
 using namespace common;
 
-class TsFileReaderTest : public ::testing::Test {
+class TsFileReaderTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         tsfile_writer_ = new TsFileWriter();
         libtsfile_init();
+        set_parallel_read_enabled(GetParam());
         file_name_ = std::string("tsfile_writer_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -113,7 +115,7 @@ class TsFileReaderTest : public ::testing::Test {
     }
 };
 
-TEST_F(TsFileReaderTest, ResultSetMetadata) {
+TEST_P(TsFileReaderTest, ResultSetMetadata) {
     std::string device_path = "device1";
     std::string measurement_name = "temperature";
     common::TSDataType data_type = common::TSDataType::INT32;
@@ -154,7 +156,7 @@ TEST_F(TsFileReaderTest, ResultSetMetadata) {
     reader.close();
 }
 
-TEST_F(TsFileReaderTest, GetAllDevice) {
+TEST_P(TsFileReaderTest, GetAllDevice) {
     std::string measurement_name = "temperature";
     common::TSDataType data_type = common::TSDataType::INT32;
     common::TSEncoding encoding = common::TSEncoding::PLAIN;
@@ -197,7 +199,7 @@ TEST_F(TsFileReaderTest, GetAllDevice) {
     }
 }
 
-TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
+TEST_P(TsFileReaderTest, GetTimeseriesSchema) {
     std::vector<std::string> device_path = {"device.ln1", "device.ln2 "};
     std::vector<std::string> measurement_name = {"temperature", "humidity"};
     common::TSDataType data_type = common::TSDataType::INT32;
@@ -267,7 +269,7 @@ TEST_F(TsFileReaderTest, GetTimeseriesSchema) {
 static const int64_t kLargeFileNumRecords = 300000000;
 static const int64_t kLargeFileFlushBatch = 100000;
 
-TEST_F(TsFileReaderTest,
+TEST_P(TsFileReaderTest,
        DISABLED_LargeFileNoEncodingNoCompression_WriteAndRead) {
     std::string device_path = "device1";
     std::string measurement_name = "temperature";
@@ -325,3 +327,6 @@ TEST_F(TsFileReaderTest,
     reader.destroy_query_data_set(qds);
     reader.close();
 }
+
+INSTANTIATE_TEST_SUITE_P(Serial, TsFileReaderTest, ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TsFileReaderTest, ::testing::Values(true));

Reply via email to