This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch experiment
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit 344ee5db61f178737255a1887eb4867bab6bc11b
Author: ColinLee <[email protected]>
AuthorDate: Sun Mar 22 11:30:11 2026 +0800

    experiment.
---
 cpp/src/common/allocator/byte_stream.h |  46 ++++---
 cpp/src/common/container/bit_map.h     |   3 +
 cpp/src/common/statistic.h             | 192 ++++++++++++++++++++++++++
 cpp/src/common/tablet.cc               |  90 ++++++++++++
 cpp/src/common/tablet.h                |  12 ++
 cpp/src/common/thread_pool.h           |  86 ++++++++++++
 cpp/src/encoding/encoder.h             |  56 ++++++++
 cpp/src/encoding/plain_encoder.h       | 102 ++++++++++++++
 cpp/src/writer/chunk_writer.h          |  30 ++++
 cpp/src/writer/page_writer.h           |  17 ++-
 cpp/src/writer/time_chunk_writer.h     |  27 ++++
 cpp/src/writer/time_page_writer.h      |  24 +++-
 cpp/src/writer/tsfile_writer.cc        | 245 ++++++++++++++++++++++++++++++---
 cpp/src/writer/tsfile_writer.h         |  13 ++
 cpp/src/writer/value_chunk_writer.h    |  31 +++++
 cpp/src/writer/value_page_writer.h     |  59 +++++++-
 16 files changed, 988 insertions(+), 45 deletions(-)

diff --git a/cpp/src/common/allocator/byte_stream.h 
b/cpp/src/common/allocator/byte_stream.h
index 4e1029ea4..7abc2b4f0 100644
--- a/cpp/src/common/allocator/byte_stream.h
+++ b/cpp/src/common/allocator/byte_stream.h
@@ -253,6 +253,8 @@ class ByteStream {
     };
 
    public:
+    static const uint32_t DEFAULT_PAGE_SIZE = 1024;
+
     ByteStream(uint32_t page_size, AllocModID mid, bool enable_atomic = false,
                BaseAllocator& allocator = g_base_allocator)
         : allocator_(allocator),
@@ -263,10 +265,9 @@ class ByteStream {
           read_pos_(0),
           marked_read_pos_(0),
           page_size_(page_size),
+          page_mask_(page_size - 1),
           mid_(mid),
-          wrapped_page_(false, nullptr) {
-        // assert(page_size >= 16);  // commented out by gxh on 2023.03.09
-    }
+          wrapped_page_(false, nullptr) {}
 
     // TODO use a specific construct function to mark it as wrapped use.
     // for wrap plain buffer to ByteStream
@@ -279,6 +280,7 @@ class ByteStream {
           read_pos_(0),
           marked_read_pos_(0),
           page_size_(0),
+          page_mask_(0),
           mid_(MOD_DEFAULT),
           wrapped_page_(false, nullptr) {}
 
@@ -292,6 +294,7 @@ class ByteStream {
         wrapped_page_.buf_ = (uint8_t*)buf;
 
         page_size_ = buf_len;
+        page_mask_ = buf_len - 1;
         head_.store(&wrapped_page_);
         tail_.store(&wrapped_page_);
         total_size_.store(buf_len);
@@ -340,6 +343,7 @@ class ByteStream {
     // never used TODO
     void shallow_clone_from(ByteStream& other) {
         this->page_size_ = other.page_size_;
+        this->page_mask_ = other.page_mask_;
         this->mid_ = other.mid_;
         this->head_.store(other.head_.load());
         this->tail_.store(other.tail_.load());
@@ -366,10 +370,10 @@ class ByteStream {
                 std::cout << "write_buf error " << ret << std::endl;
                 return ret;
             }
-            uint32_t remainder = page_size_ - (total_size_.load() % 
page_size_);
+            uint32_t remainder = page_size_ - (total_size_.load() & 
page_mask_);
             uint32_t copy_len =
                 remainder < (len - write_len) ? remainder : (len - write_len);
-            memcpy(tail_.load()->buf_ + total_size_.load() % page_size_,
+            memcpy(tail_.load()->buf_ + (total_size_.load() & page_mask_),
                    buf + write_len, copy_len);
             total_size_.atomic_aaf(copy_len);
             write_len += copy_len;
@@ -390,11 +394,11 @@ class ByteStream {
             if (RET_FAIL(check_space())) {
                 return ret;
             }
-            uint32_t remainder = page_size_ - (read_pos_ % page_size_);
+            uint32_t remainder = page_size_ - (read_pos_ & page_mask_);
             uint32_t copy_len = remainder < want_len_limited - read_len
                                     ? remainder
                                     : want_len_limited - read_len;
-            memcpy(buf + read_len, read_page_->buf_ + (read_pos_ % page_size_),
+            memcpy(buf + read_len, read_page_->buf_ + (read_pos_ & page_mask_),
                    copy_len);
             read_len += copy_len;
             read_pos_ += copy_len;
@@ -446,16 +450,17 @@ class ByteStream {
             return b;
         }
         b.buf_ =
-            (char*)(tail_.load()->buf_ + (total_size_.load() % page_size_));
-        b.len_ = page_size_ - (total_size_.load() % page_size_);
+            (char*)(tail_.load()->buf_ + (total_size_.load() & page_mask_));
+        b.len_ = page_size_ - (total_size_.load() & page_mask_);
         return b;
     }
 
     void buffer_used(uint32_t used_bytes) {
         ASSERT(used_bytes >= 1);
         // would not span page
-        ASSERT((total_size_.load() / page_size_) ==
-               ((total_size_.load() + used_bytes - 1) / page_size_));
+        ASSERT(page_size_ == 0 ||
+               (total_size_.load() / page_size_) ==
+                   ((total_size_.load() + used_bytes - 1) / page_size_));
         total_size_.atomic_aaf(used_bytes);
     }
 
@@ -471,7 +476,7 @@ class ByteStream {
             if (RET_FAIL(prepare_space())) {
                 return ret;
             }
-            uint32_t remainder = page_size_ - (total_size_.load() % 
page_size_);
+            uint32_t remainder = page_size_ - (total_size_.load() & 
page_mask_);
             uint32_t step =
                 remainder < (len - advanced) ? remainder : (len - advanced);
             total_size_.atomic_aaf(step);
@@ -501,8 +506,8 @@ class ByteStream {
             if (cur_ != nullptr) {
                 b.buf_ = (char*)cur_->buf_;
                 if (cur_ == end_ &&
-                    host_.total_size_.load() % host_.page_size_ != 0) {
-                    b.len_ = host_.total_size_.load() % host_.page_size_;
+                    (host_.total_size_.load() & host_.page_mask_) != 0) {
+                    b.len_ = host_.total_size_.load() & host_.page_mask_;
                 } else {
                     b.len_ = host_.page_size_;
                 }
@@ -559,7 +564,7 @@ class ByteStream {
 
             while (true) {
                 if (cur_ == host_end) {
-                    if (host_total_size % host_.page_size_ == 0) {
+                    if ((host_total_size & host_.page_mask_) == 0) {
                         if (read_offset_within_cur_page_ == host_.page_size_) {
                             return b;
                         } else {
@@ -573,15 +578,15 @@ class ByteStream {
                         }
                     } else {
                         if (read_offset_within_cur_page_ ==
-                            (host_total_size % host_.page_size_)) {
+                            (host_total_size & host_.page_mask_)) {
                             return b;
                         } else {
                             b.buf_ = ((char*)(cur_->buf_)) +
                                      read_offset_within_cur_page_;
-                            b.len_ = (host_total_size % host_.page_size_) -
+                            b.len_ = (host_total_size & host_.page_mask_) -
                                      read_offset_within_cur_page_;
                             read_offset_within_cur_page_ =
-                                (host_total_size % host_.page_size_);
+                                (host_total_size & host_.page_mask_);
                             total_end_offset_ += b.len_;
                             return b;
                         }
@@ -611,7 +616,7 @@ class ByteStream {
     FORCE_INLINE int prepare_space() {
         int ret = common::E_OK;
         if (UNLIKELY(tail_.load() == nullptr ||
-                     total_size_.load() % page_size_ == 0)) {
+                     (total_size_.load() & page_mask_) == 0)) {
             Page* p = nullptr;
             if (RET_FAIL(alloc_page(p))) {
                 return ret;
@@ -628,7 +633,7 @@ class ByteStream {
         }
         if (UNLIKELY(read_page_ == nullptr)) {
             read_page_ = head_.load();
-        } else if (UNLIKELY(read_pos_ % page_size_ == 0)) {
+        } else if (UNLIKELY((read_pos_ & page_mask_) == 0)) {
             read_page_ = read_page_->next_.load();
         }
         if (UNLIKELY(read_page_ == nullptr)) {
@@ -668,6 +673,7 @@ class ByteStream {
     uint32_t read_pos_;                    // current reader position
     uint32_t marked_read_pos_;             // current reader position
     uint32_t page_size_;
+    uint32_t page_mask_;  // page_size_ - 1, for bitwise AND instead of modulo
     AllocModID mid_;
     Page wrapped_page_;
 };
diff --git a/cpp/src/common/container/bit_map.h 
b/cpp/src/common/container/bit_map.h
index 356932d14..ea4fd1a5b 100644
--- a/cpp/src/common/container/bit_map.h
+++ b/cpp/src/common/container/bit_map.h
@@ -64,6 +64,9 @@ class BitMap {
         return (*start_addr & bit_mask);
     }
 
+    // Set all bits to 0 (all non-null in TsFile convention where bit=1 is 
null)
+    FORCE_INLINE void clear_all() { memset(bitmap_, 0x00, size_); }
+
     FORCE_INLINE uint32_t get_size() { return size_; }
 
     FORCE_INLINE char* get_bitmap() { return bitmap_; }  // for debug
diff --git a/cpp/src/common/statistic.h b/cpp/src/common/statistic.h
index f6d53c206..7128b43b8 100644
--- a/cpp/src/common/statistic.h
+++ b/cpp/src/common/statistic.h
@@ -22,12 +22,18 @@
 
 #include <inttypes.h>
 
+#include <algorithm>
 #include <sstream>
 
 #include "common/allocator/alloc_base.h"
 #include "common/allocator/byte_stream.h"
 #include "common/db_common.h"
 
+#if defined(__ARM_NEON) || defined(__ARM_NEON__)
+#include <arm_neon.h>
+#define TSFILE_HAS_NEON 1
+#endif
+
 namespace storage {
 
 /*
@@ -176,6 +182,48 @@ class Statistic {
     }
     virtual FORCE_INLINE void update(int64_t time) { ASSERT(false); }
 
+    virtual void update_time_batch(const int64_t* timestamps, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i]);
+        }
+    }
+    virtual void update_batch(const int64_t* timestamps,
+                              const bool* values, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i], values[i]);
+        }
+    }
+    virtual void update_batch(const int64_t* timestamps,
+                              const int32_t* values, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i], values[i]);
+        }
+    }
+    virtual void update_batch(const int64_t* timestamps,
+                              const int64_t* values, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i], values[i]);
+        }
+    }
+    virtual void update_batch(const int64_t* timestamps,
+                              const float* values, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i], values[i]);
+        }
+    }
+    virtual void update_batch(const int64_t* timestamps,
+                              const double* values, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i], values[i]);
+        }
+    }
+    virtual void update_batch(const int64_t* timestamps,
+                              const common::String* values, uint32_t count) {
+        for (uint32_t i = 0; i < count; i++) {
+            update(timestamps[i], values[i]);
+        }
+    }
+
     virtual int serialize_to(common::ByteStream& out) {
         int ret = common::E_OK;
         if (RET_FAIL(common::SerializationUtil::write_var_uint(count_, out))) {
@@ -638,6 +686,32 @@ class Int32Statistic : public Statistic {
         NUM_STAT_UPDATE(time, value);
     }
 
+    void update_batch(const int64_t* timestamps, const int32_t* values,
+                      uint32_t count) override {
+        if (count == 0) return;
+        uint32_t start = 0;
+        if (count_ == 0) {
+            start_time_ = timestamps[0];
+            end_time_ = timestamps[0];
+            first_value_ = values[0];
+            last_value_ = values[0];
+            min_value_ = values[0];
+            max_value_ = values[0];
+            sum_value_ = (int64_t)values[0];
+            count_ = 1;
+            start = 1;
+        }
+        for (uint32_t i = start; i < count; i++) {
+            if (timestamps[i] < start_time_) start_time_ = timestamps[i];
+            if (timestamps[i] > end_time_) end_time_ = timestamps[i];
+            if (values[i] < min_value_) min_value_ = values[i];
+            if (values[i] > max_value_) max_value_ = values[i];
+            sum_value_ += (int64_t)values[i];
+        }
+        last_value_ = values[count - 1];
+        count_ += (count - start);
+    }
+
     FORCE_INLINE common::TSDataType get_type() { return common::INT32; }
 
     int serialize_typed_stat(common::ByteStream& out) {
@@ -738,6 +812,60 @@ class Int64Statistic : public Statistic {
         NUM_STAT_UPDATE(time, value);
     }
 
+    void update_batch(const int64_t* timestamps, const int64_t* values,
+                      uint32_t count) override {
+        if (count == 0) return;
+        uint32_t start = 0;
+        if (count_ == 0) {
+            start_time_ = timestamps[0];
+            end_time_ = timestamps[0];
+            first_value_ = values[0];
+            last_value_ = values[0];
+            min_value_ = values[0];
+            max_value_ = values[0];
+            sum_value_ = (double)values[0];
+            count_ = 1;
+            start = 1;
+        }
+        // Timestamps are monotonic (verified by TimePageWriter),
+        // so only first/last matter for start_time_/end_time_.
+        if (count > start) {
+            if (timestamps[start] < start_time_)
+                start_time_ = timestamps[start];
+            if (timestamps[count - 1] > end_time_)
+                end_time_ = timestamps[count - 1];
+        }
+        uint32_t i = start;
+#if TSFILE_HAS_NEON
+        {
+            int64x2_t vmin = vdupq_n_s64(min_value_);
+            int64x2_t vmax = vdupq_n_s64(max_value_);
+            float64x2_t vsum = vdupq_n_f64(0.0);
+            for (; i + 2 <= count; i += 2) {
+                int64x2_t v = vld1q_s64(&values[i]);
+                // min/max via compare+select (no vminq_s64 in NEON)
+                uint64x2_t lt = vcltq_s64(v, vmin);
+                vmin = vbslq_s64(lt, v, vmin);
+                uint64x2_t gt = vcgtq_s64(v, vmax);
+                vmax = vbslq_s64(gt, v, vmax);
+                vsum = vaddq_f64(vsum, vcvtq_f64_s64(v));
+            }
+            min_value_ = std::min(vgetq_lane_s64(vmin, 0),
+                                  vgetq_lane_s64(vmin, 1));
+            max_value_ = std::max(vgetq_lane_s64(vmax, 0),
+                                  vgetq_lane_s64(vmax, 1));
+            sum_value_ += vgetq_lane_f64(vsum, 0) + vgetq_lane_f64(vsum, 1);
+        }
+#endif
+        for (; i < count; i++) {
+            if (values[i] < min_value_) min_value_ = values[i];
+            if (values[i] > max_value_) max_value_ = values[i];
+            sum_value_ += (double)values[i];
+        }
+        last_value_ = values[count - 1];
+        count_ += (count - start);
+    }
+
     FORCE_INLINE common::TSDataType get_type() { return common::INT64; }
 
     int serialize_typed_stat(common::ByteStream& out) {
@@ -904,6 +1032,55 @@ class DoubleStatistic : public Statistic {
         NUM_STAT_UPDATE(time, value);
     }
 
+    void update_batch(const int64_t* timestamps, const double* values,
+                      uint32_t count) override {
+        if (count == 0) return;
+        uint32_t start = 0;
+        if (count_ == 0) {
+            start_time_ = timestamps[0];
+            end_time_ = timestamps[0];
+            first_value_ = values[0];
+            last_value_ = values[0];
+            min_value_ = values[0];
+            max_value_ = values[0];
+            sum_value_ = values[0];
+            count_ = 1;
+            start = 1;
+        }
+        if (count > start) {
+            if (timestamps[start] < start_time_)
+                start_time_ = timestamps[start];
+            if (timestamps[count - 1] > end_time_)
+                end_time_ = timestamps[count - 1];
+        }
+        uint32_t i = start;
+#if TSFILE_HAS_NEON
+        {
+            float64x2_t vmin = vdupq_n_f64(min_value_);
+            float64x2_t vmax = vdupq_n_f64(max_value_);
+            float64x2_t vsum = vdupq_n_f64(0.0);
+            for (; i + 2 <= count; i += 2) {
+                float64x2_t v = vld1q_f64(&values[i]);
+                vmin = vminq_f64(vmin, v);
+                vmax = vmaxq_f64(vmax, v);
+                vsum = vaddq_f64(vsum, v);
+            }
+            min_value_ = std::min(vgetq_lane_f64(vmin, 0),
+                                  vgetq_lane_f64(vmin, 1));
+            max_value_ = std::max(vgetq_lane_f64(vmax, 0),
+                                  vgetq_lane_f64(vmax, 1));
+            sum_value_ += vgetq_lane_f64(vsum, 0) + vgetq_lane_f64(vsum, 1);
+        }
+#endif
+        for (; i < count; i++) {
+            if (values[i] < min_value_) min_value_ = values[i];
+            if (values[i] > max_value_) max_value_ = values[i];
+            sum_value_ += values[i];
+        }
+        last_value_ = values[count - 1];
+        count_ += (count - start);
+    }
+
     FORCE_INLINE common::TSDataType get_type() { return common::DOUBLE; }
 
     int serialize_typed_stat(common::ByteStream& out) {
@@ -971,6 +1148,21 @@ class TimeStatistic : public Statistic {
         count_++;
     }
 
+    void update_time_batch(const int64_t* timestamps,
+                           uint32_t count) override {
+        if (count == 0) return;
+        if (count_ == 0) {
+            start_time_ = timestamps[0];
+            end_time_ = timestamps[0];
+        }
+        // Timestamps are already verified monotonic in TimePageWriter,
+        // so first element is min candidate and last is max candidate.
+        if (timestamps[0] < start_time_) start_time_ = timestamps[0];
+        if (timestamps[count - 1] > end_time_)
+            end_time_ = timestamps[count - 1];
+        count_ += count;
+    }
+
     FORCE_INLINE common::TSDataType get_type() { return common::VECTOR; }
 
     int serialize_typed_stat(common::ByteStream& out) { return common::E_OK; }
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index f274c82d5..71e189d3c 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -166,6 +166,96 @@ int Tablet::add_timestamp(uint32_t row_index, int64_t 
timestamp) {
     return E_OK;
 }
 
+int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) {
+    if (err_code_ != E_OK) return err_code_;
+    ASSERT(timestamps_ != NULL);
+    if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) {
+        return E_OUT_OF_RANGE;
+    }
+    std::memcpy(timestamps_, timestamps, count * sizeof(int64_t));
+    cur_row_size_ = std::max(count, cur_row_size_);
+    return E_OK;
+}
+
+int Tablet::set_column_values(uint32_t schema_index, const void* data,
+                              const uint8_t* bitmap, uint32_t count) {
+    if (err_code_ != E_OK) return err_code_;
+    if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE;
+    if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_)))
+        return E_OUT_OF_RANGE;
+
+    const MeasurementSchema& schema = schema_vec_->at(schema_index);
+    size_t elem_size = 0;
+    void* dst = nullptr;
+    switch (schema.data_type_) {
+        case BOOLEAN:
+            elem_size = sizeof(bool);
+            dst = value_matrix_[schema_index].bool_data;
+            break;
+        case DATE:
+        case INT32:
+            elem_size = sizeof(int32_t);
+            dst = value_matrix_[schema_index].int32_data;
+            break;
+        case TIMESTAMP:
+        case INT64:
+            elem_size = sizeof(int64_t);
+            dst = value_matrix_[schema_index].int64_data;
+            break;
+        case FLOAT:
+            elem_size = sizeof(float);
+            dst = value_matrix_[schema_index].float_data;
+            break;
+        case DOUBLE:
+            elem_size = sizeof(double);
+            dst = value_matrix_[schema_index].double_data;
+            break;
+        default:
+            return E_NOT_SUPPORT;
+    }
+
+    std::memcpy(dst, data, count * elem_size);
+    if (bitmap == nullptr) {
+        bitmaps_[schema_index].clear_all();
+    } else {
+        char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
+        uint32_t bm_bytes = (count + 7) / 8;
+        std::memcpy(tsfile_bm, bitmap, bm_bytes);
+    }
+    cur_row_size_ = std::max(count, cur_row_size_);
+    return E_OK;
+}
+
+int Tablet::set_column_string_repeated(uint32_t schema_index, const char* str,
+                                       uint32_t str_len, uint32_t count) {
+    if (err_code_ != E_OK) return err_code_;
+    if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE;
+    if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_)))
+        return E_OUT_OF_RANGE;
+
+    StringColumn* sc = value_matrix_[schema_index].string_col;
+    if (sc == nullptr) return E_INVALID_ARG;
+
+    // Pre-allocate buffer for all identical strings
+    uint32_t total_bytes = str_len * count;
+    if (total_bytes > sc->buf_capacity) {
+        sc->buf_capacity = total_bytes;
+        sc->buffer = (char*)mem_realloc(sc->buffer, sc->buf_capacity);
+    }
+
+    // Fill offsets and buffer in bulk
+    for (uint32_t i = 0; i < count; i++) {
+        sc->offsets[i] = i * str_len;
+        memcpy(sc->buffer + i * str_len, str, str_len);
+    }
+    sc->offsets[count] = total_bytes;
+    sc->buf_used = total_bytes;
+
+    bitmaps_[schema_index].clear_all();
+    cur_row_size_ = std::max(count, cur_row_size_);
+    return E_OK;
+}
+
 void* Tablet::get_value(int row_index, uint32_t schema_index,
                         common::TSDataType& data_type) const {
     if (UNLIKELY(schema_index >= schema_vec_->size())) {
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index 2611220d4..9770db246 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -245,6 +245,18 @@ class Tablet {
      */
     int add_timestamp(uint32_t row_index, int64_t timestamp);
 
+    // Bulk copy timestamps via memcpy. count must be <= max_row_num_.
+    int set_timestamps(const int64_t* timestamps, uint32_t count);
+
+    // Bulk copy fixed-length column data. bitmap=nullptr means all non-null.
+    // bitmap uses TsFile convention: bit=1 is null, bit=0 is valid.
+    int set_column_values(uint32_t schema_index, const void* data,
+                          const uint8_t* bitmap, uint32_t count);
+
+    // Bulk fill a STRING column with the same value for all rows.
+    int set_column_string_repeated(uint32_t schema_index, const char* str,
+                                   uint32_t str_len, uint32_t count);
+
     void* get_value(int row_index, uint32_t schema_index,
                     common::TSDataType& data_type) const;
     /**
diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h
new file mode 100644
index 000000000..a2f728722
--- /dev/null
+++ b/cpp/src/common/thread_pool.h
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef COMMON_THREAD_POOL_H
+#define COMMON_THREAD_POOL_H
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <vector>
+
+namespace common {
+
+class ThreadPool {
+   public:
+    explicit ThreadPool(size_t num_threads) : stop_(false) {
+        for (size_t i = 0; i < num_threads; i++) {
+            workers_.emplace_back([this] {
+                for (;;) {
+                    std::function<void()> task;
+                    {
+                        std::unique_lock<std::mutex> lock(mutex_);
+                        cv_.wait(lock,
+                                 [this] { return stop_ || !tasks_.empty(); });
+                        if (stop_ && tasks_.empty()) return;
+                        task = std::move(tasks_.front());
+                        tasks_.pop();
+                    }
+                    task();
+                }
+            });
+        }
+    }
+
+    ~ThreadPool() {
+        {
+            std::unique_lock<std::mutex> lock(mutex_);
+            stop_ = true;
+        }
+        cv_.notify_all();
+        for (auto& w : workers_) w.join();
+    }
+
+    template <typename F>
+    std::future<typename std::result_of<F()>::type> submit(F&& f) {
+        using RetType = typename std::result_of<F()>::type;
+        auto task = std::make_shared<std::packaged_task<RetType()>>(
+            std::forward<F>(f));
+        std::future<RetType> result = task->get_future();
+        {
+            std::unique_lock<std::mutex> lock(mutex_);
+            tasks_.emplace([task]() { (*task)(); });
+        }
+        cv_.notify_one();
+        return result;
+    }
+
+   private:
+    std::vector<std::thread> workers_;
+    std::queue<std::function<void()>> tasks_;
+    std::mutex mutex_;
+    std::condition_variable cv_;
+    bool stop_;
+};
+
+}  // namespace common
+
+#endif  // COMMON_THREAD_POOL_H
diff --git a/cpp/src/encoding/encoder.h b/cpp/src/encoding/encoder.h
index 921686446..f34a05e64 100644
--- a/cpp/src/encoding/encoder.h
+++ b/cpp/src/encoding/encoder.h
@@ -48,6 +48,62 @@ class Encoder {
      * @return the maximal size of possible memory occupied by current encoder
      */
     virtual int get_max_byte_size() = 0;
+
+    /*
+     * Batch encoding interfaces.
+     * Default implementations fall back to per-value encode().
+     * Subclasses may override for better performance.
+     */
+    virtual int encode_batch(const bool* values, uint32_t count,
+                             common::ByteStream& out_stream) {
+        int ret = common::E_OK;
+        for (uint32_t i = 0; i < count; i++) {
+            if (RET_FAIL(encode(values[i], out_stream))) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+    virtual int encode_batch(const int32_t* values, uint32_t count,
+                             common::ByteStream& out_stream) {
+        int ret = common::E_OK;
+        for (uint32_t i = 0; i < count; i++) {
+            if (RET_FAIL(encode(values[i], out_stream))) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+    virtual int encode_batch(const int64_t* values, uint32_t count,
+                             common::ByteStream& out_stream) {
+        int ret = common::E_OK;
+        for (uint32_t i = 0; i < count; i++) {
+            if (RET_FAIL(encode(values[i], out_stream))) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+    virtual int encode_batch(const float* values, uint32_t count,
+                             common::ByteStream& out_stream) {
+        int ret = common::E_OK;
+        for (uint32_t i = 0; i < count; i++) {
+            if (RET_FAIL(encode(values[i], out_stream))) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+    virtual int encode_batch(const double* values, uint32_t count,
+                             common::ByteStream& out_stream) {
+        int ret = common::E_OK;
+        for (uint32_t i = 0; i < count; i++) {
+            if (RET_FAIL(encode(values[i], out_stream))) {
+                return ret;
+            }
+        }
+        return ret;
+    }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/encoding/plain_encoder.h b/cpp/src/encoding/plain_encoder.h
index b768c9bf0..687fa8916 100644
--- a/cpp/src/encoding/plain_encoder.h
+++ b/cpp/src/encoding/plain_encoder.h
@@ -20,8 +20,15 @@
 #ifndef ENCODING_PLAIN_ENCODER_H
 #define ENCODING_PLAIN_ENCODER_H
 
+#include <cstring>
+
 #include "encoder.h"
 
+#if defined(__ARM_NEON) || defined(__ARM_NEON__)
+#include <arm_neon.h>
+#define TSFILE_HAS_NEON 1
+#endif
+
 namespace storage {
 
 class PlainEncoder : public Encoder {
@@ -64,6 +71,101 @@ class PlainEncoder : public Encoder {
     }
 
     int get_max_byte_size() { return 0; }
+
+    // Optimized batch encoding: directly byte-swap into ByteStream page 
buffer.
+    // Avoids per-value write_buf overhead entirely — only calls acquire_buf()
+    // once per page boundary crossing.
+    int encode_batch(const int64_t* values, uint32_t count,
+                     common::ByteStream& out_stream) override {
+        if (count == 0) return common::E_OK;
+        uint32_t offset = 0;
+        while (offset < count) {
+            common::ByteStream::Buffer buf = out_stream.acquire_buf();
+            if (UNLIKELY(buf.buf_ == nullptr)) return common::E_OOM;
+            // How many int64 values fit in the remaining page space?
+            uint32_t capacity = buf.len_ / 8;
+            if (capacity == 0) {
+                // Page has < 8 bytes left, fall back to write_buf for this one
+                return Encoder::encode_batch(values + offset, count - offset,
+                                             out_stream);
+            }
+            uint32_t batch = std::min(count - offset, capacity);
+            uint8_t* dst = (uint8_t*)buf.buf_;
+            const int64_t* src = values + offset;
+            uint32_t i = 0;
+#if TSFILE_HAS_NEON
+            // NEON: byte-reverse 2 x int64 per iteration
+            for (; i + 2 <= batch; i += 2) {
+                uint8x16_t v = vld1q_u8((const uint8_t*)&src[i]);
+                v = vrev64q_u8(v);
+                vst1q_u8(dst, v);
+                dst += 16;
+            }
+#endif
+            // Scalar tail
+            for (; i < batch; i++) {
+                uint64_t v = (uint64_t)src[i];
+                dst[0] = (uint8_t)(v >> 56);
+                dst[1] = (uint8_t)(v >> 48);
+                dst[2] = (uint8_t)(v >> 40);
+                dst[3] = (uint8_t)(v >> 32);
+                dst[4] = (uint8_t)(v >> 24);
+                dst[5] = (uint8_t)(v >> 16);
+                dst[6] = (uint8_t)(v >> 8);
+                dst[7] = (uint8_t)(v);
+                dst += 8;
+            }
+            out_stream.buffer_used(batch * 8);
+            offset += batch;
+        }
+        return common::E_OK;
+    }
+
+    int encode_batch(const double* values, uint32_t count,
+                     common::ByteStream& out_stream) override {
+        return encode_batch(reinterpret_cast<const int64_t*>(values), count,
+                            out_stream);
+    }
+
+    int encode_batch(const float* values, uint32_t count,
+                     common::ByteStream& out_stream) override {
+        if (count == 0) return common::E_OK;
+        uint32_t offset = 0;
+        while (offset < count) {
+            common::ByteStream::Buffer buf = out_stream.acquire_buf();
+            if (UNLIKELY(buf.buf_ == nullptr)) return common::E_OOM;
+            uint32_t capacity = buf.len_ / 4;
+            if (capacity == 0) {
+                return Encoder::encode_batch(values + offset, count - offset,
+                                             out_stream);
+            }
+            uint32_t batch = std::min(count - offset, capacity);
+            uint8_t* dst = (uint8_t*)buf.buf_;
+            const float* src = values + offset;
+            uint32_t i = 0;
+#if TSFILE_HAS_NEON
+            // NEON: byte-reverse 4 x float (32-bit) per iteration
+            for (; i + 4 <= batch; i += 4) {
+                uint8x16_t v = vld1q_u8((const uint8_t*)&src[i]);
+                v = vrev32q_u8(v);
+                vst1q_u8(dst, v);
+                dst += 16;
+            }
+#endif
+            for (; i < batch; i++) {
+                uint32_t v;
+                memcpy(&v, &src[i], sizeof(float));
+                dst[0] = (uint8_t)(v >> 24);
+                dst[1] = (uint8_t)(v >> 16);
+                dst[2] = (uint8_t)(v >> 8);
+                dst[3] = (uint8_t)(v);
+                dst += 4;
+            }
+            out_stream.buffer_used(batch * 4);
+            offset += batch;
+        }
+        return common::E_OK;
+    }
 };
 
 }  // end namespace storage
diff --git a/cpp/src/writer/chunk_writer.h b/cpp/src/writer/chunk_writer.h
index 3032ff9a5..e88747531 100644
--- a/cpp/src/writer/chunk_writer.h
+++ b/cpp/src/writer/chunk_writer.h
@@ -103,6 +103,36 @@ class ChunkWriter {
         CW_DO_WRITE_FOR_TYPE();
     }
 
+    template <typename T>
+    int write_batch(const int64_t* timestamps, const T* values,
+                    uint32_t count) {
+        int ret = common::E_OK;
+        uint32_t offset = 0;
+        while (offset < count) {
+            uint32_t cur_points = page_writer_.get_point_numer();
+            uint32_t page_remaining =
+                common::g_config_value_.page_writer_max_point_num_ - 
cur_points;
+            if (page_remaining == 0) {
+                if (RET_FAIL(seal_cur_page(false))) {
+                    return ret;
+                }
+                page_remaining =
+                    common::g_config_value_.page_writer_max_point_num_;
+            }
+            uint32_t batch_size = std::min(count - offset, page_remaining);
+            if (RET_FAIL(page_writer_.write_batch(timestamps + offset,
+                                                   values + offset,
+                                                   batch_size))) {
+                return ret;
+            }
+            offset += batch_size;
+            if (RET_FAIL(seal_cur_page_if_full())) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+
     int end_encode_chunk();
     common::ByteStream& get_chunk_data() { return chunk_data_; }
     Statistic* get_chunk_statistic() { return chunk_statistic_; }
diff --git a/cpp/src/writer/page_writer.h b/cpp/src/writer/page_writer.h
index cff4b60ed..f34e7de2f 100644
--- a/cpp/src/writer/page_writer.h
+++ b/cpp/src/writer/page_writer.h
@@ -150,6 +150,21 @@ class PageWriter {
         PW_DO_WRITE_FOR_TYPE();
     }
 
+    template <typename T>
+    FORCE_INLINE int write_batch(const int64_t* timestamps, const T* values,
+                                 uint32_t count) {
+        int ret = common::E_OK;
+        if (count == 0) return ret;
+        if (RET_FAIL(time_encoder_->encode_batch(timestamps, count,
+                                                  time_out_stream_))) {
+        } else if (RET_FAIL(value_encoder_->encode_batch(values, count,
+                                                          value_out_stream_))) 
{
+        } else {
+            statistic_->update_batch(timestamps, values, count);
+        }
+        return ret;
+    }
+
     FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; 
}
     FORCE_INLINE uint32_t get_time_out_stream_size() const {
         return time_out_stream_.total_size();
@@ -194,7 +209,7 @@ class PageWriter {
 
    private:
     // static const uint32_t OUT_STREAM_PAGE_SIZE = 48;
-    static const uint32_t OUT_STREAM_PAGE_SIZE = 1024;
+    static const uint32_t OUT_STREAM_PAGE_SIZE = 65536;
 
    private:
     common::TSDataType data_type_;
diff --git a/cpp/src/writer/time_chunk_writer.h 
b/cpp/src/writer/time_chunk_writer.h
index ac3b374b0..c071551bb 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -63,6 +63,33 @@ class TimeChunkWriter {
         return ret;
     }
 
+    int write_batch(const int64_t* timestamps, uint32_t count) {
+        int ret = common::E_OK;
+        uint32_t offset = 0;
+        while (offset < count) {
+            uint32_t cur_points = time_page_writer_.get_point_numer();
+            uint32_t page_remaining =
+                common::g_config_value_.page_writer_max_point_num_ - 
cur_points;
+            if (page_remaining == 0) {
+                if (RET_FAIL(seal_cur_page(false))) {
+                    return ret;
+                }
+                page_remaining =
+                    common::g_config_value_.page_writer_max_point_num_;
+            }
+            uint32_t batch_size = std::min(count - offset, page_remaining);
+            if (RET_FAIL(time_page_writer_.write_batch(timestamps + offset,
+                                                        batch_size))) {
+                return ret;
+            }
+            offset += batch_size;
+            if (RET_FAIL(seal_cur_page_if_full())) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+
     int end_encode_chunk();
     common::ByteStream& get_chunk_data() { return chunk_data_; }
     Statistic* get_chunk_statistic() { return chunk_statistic_; }
diff --git a/cpp/src/writer/time_page_writer.h 
b/cpp/src/writer/time_page_writer.h
index 4c01044a6..7b749f7b2 100644
--- a/cpp/src/writer/time_page_writer.h
+++ b/cpp/src/writer/time_page_writer.h
@@ -84,6 +84,28 @@ class TimePageWriter {
         return ret;
     }
 
+    int write_batch(const int64_t* timestamps, uint32_t count) {
+        int ret = common::E_OK;
+        if (count == 0) return ret;
+        // Check order: first timestamp vs existing end_time
+        if (statistic_->count_ != 0 && is_inited_ &&
+            timestamps[0] <= statistic_->end_time_) {
+            return common::E_OUT_OF_ORDER;
+        }
+        // Check monotonicity within batch
+        for (uint32_t i = 1; i < count; i++) {
+            if (timestamps[i] <= timestamps[i - 1]) {
+                return common::E_OUT_OF_ORDER;
+            }
+        }
+        if (RET_FAIL(time_encoder_->encode_batch(timestamps, count,
+                                                  time_out_stream_))) {
+        } else {
+            statistic_->update_time_batch(timestamps, count);
+        }
+        return ret;
+    }
+
     FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; 
}
     FORCE_INLINE uint32_t get_time_out_stream_size() const {
         return time_out_stream_.total_size();
@@ -115,7 +137,7 @@ class TimePageWriter {
                           common::ByteStream& pages_data);
 
    private:
-    static const uint32_t OUT_STREAM_PAGE_SIZE = 1024;
+    static const uint32_t OUT_STREAM_PAGE_SIZE = 65536;
 
    private:
     common::TSDataType data_type_;
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 6aa716e28..a861b08b4 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -796,15 +796,16 @@ int TsFileWriter::write_tablet_aligned(const Tablet& 
tablet) {
             data_types))) {
         return ret;
     }
-    time_write_column(time_chunk_writer, tablet);
+    time_write_column_batch(time_chunk_writer, tablet, 0,
+                            tablet.get_cur_row_size());
     ASSERT(value_chunk_writers.size() == tablet.get_column_count());
     for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
         ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
         if (IS_NULL(value_chunk_writer)) {
             continue;
         }
-        if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
-                                        tablet.get_cur_row_size()))) {
+        if (RET_FAIL(value_write_column_batch(value_chunk_writer, tablet, c, 0,
+                                               tablet.get_cur_row_size()))) {
             return ret;
         }
     }
@@ -827,7 +828,8 @@ int TsFileWriter::write_tablet(const Tablet& tablet) {
         if (IS_NULL(chunk_writer)) {
             continue;
         }
-        if (RET_FAIL(write_column(chunk_writer, tablet, c))) {
+        if (RET_FAIL(write_column_batch(chunk_writer, tablet, c, 0,
+                                         tablet.max_row_num_))) {
             return ret;
         }
     }
@@ -888,28 +890,72 @@ int TsFileWriter::write_table(Tablet& tablet) {
                                                value_chunk_writers))) {
                 return ret;
             }
-            for (int i = start_idx; i < end_idx; i++) {
-                if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) 
{
-                    return ret;
-                }
-            }
+
+            // Collect column tasks for parallel execution
+            struct ColTask {
+                ValueChunkWriter* writer;
+                uint32_t col_idx;
+            };
+            std::vector<ColTask> tasks;
             uint32_t field_col_count = 0;
             for (uint32_t i = 0; i < tablet.get_column_count(); ++i) {
                 if (tablet.column_categories_[i] ==
                     common::ColumnCategory::FIELD) {
-                    ValueChunkWriter* value_chunk_writer =
+                    ValueChunkWriter* vcw =
                         value_chunk_writers[field_col_count];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
+                    if (!IS_NULL(vcw)) {
+                        tasks.push_back({vcw, i});
                     }
+                    field_col_count++;
+                }
+            }
+
+            // Parallel encode: time column + all value columns concurrently.
+            // Each ChunkWriter has its own Encoder, Statistic, ByteStream —
+            // zero shared state, no locks needed.
+            const uint32_t si = start_idx;
+            const uint32_t ei = end_idx;
+
+            if (tasks.size() >= 2) {
+                // Launch time column + value columns in parallel via thread 
pool
+                auto time_future = thread_pool_.submit(
+                    [this, time_chunk_writer, &tablet, si, ei]() {
+                        return time_write_column_batch(time_chunk_writer,
+                                                       tablet, si, ei);
+                    });
+
+                std::vector<std::future<int>> val_futures;
+                for (size_t t = 0; t < tasks.size(); t++) {
+                    auto& task = tasks[t];
+                    val_futures.push_back(thread_pool_.submit(
+                        [this, &task, &tablet, si, ei]() {
+                            return value_write_column_batch(
+                                task.writer, tablet, task.col_idx, si, ei);
+                        }));
+                }
 
-                    if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
-                                                    i, start_idx, end_idx))) {
+                // Wait for all and check errors
+                ret = time_future.get();
+                if (ret != E_OK) return ret;
+                for (auto& f : val_futures) {
+                    int r = f.get();
+                    if (r != E_OK && ret == E_OK) ret = r;
+                }
+                if (ret != E_OK) return ret;
+            } else {
+                // Too few columns to justify thread overhead, run serially
+                if (RET_FAIL(time_write_column_batch(time_chunk_writer,
+                                                      tablet, si, ei))) {
+                    return ret;
+                }
+                for (auto& task : tasks) {
+                    if (RET_FAIL(value_write_column_batch(
+                            task.writer, tablet, task.col_idx, si, ei))) {
                         return ret;
                     }
-                    field_col_count++;
                 }
             }
+
             start_idx = end_idx;
         } else {
             MeasurementNamesFromTablet mnames_getter(tablet);
@@ -920,14 +966,34 @@ int TsFileWriter::write_table(Tablet& tablet) {
                 return ret;
             }
             ASSERT(chunk_writers.size() == tablet.get_column_count());
-            for (uint32_t c = 0; c < chunk_writers.size(); c++) {
-                ChunkWriter* chunk_writer = chunk_writers[c];
-                if (IS_NULL(chunk_writer)) {
-                    continue;
+
+            // Parallel encode for non-aligned path
+            if (chunk_writers.size() >= 2) {
+                const uint32_t si = start_idx;
+                const uint32_t ei = device_id_end_index_pair.second;
+                std::vector<std::future<int>> futures;
+                for (uint32_t c = 0; c < chunk_writers.size(); c++) {
+                    ChunkWriter* cw = chunk_writers[c];
+                    if (IS_NULL(cw)) continue;
+                    futures.push_back(thread_pool_.submit(
+                        [this, cw, &tablet, c, si, ei]() {
+                            return write_column_batch(cw, tablet, c, si, ei);
+                        }));
                 }
-                if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
-                                          device_id_end_index_pair.second))) {
-                    return ret;
+                for (auto& f : futures) {
+                    int r = f.get();
+                    if (r != E_OK && ret == E_OK) ret = r;
+                }
+                if (ret != E_OK) return ret;
+            } else {
+                for (uint32_t c = 0; c < chunk_writers.size(); c++) {
+                    ChunkWriter* chunk_writer = chunk_writers[c];
+                    if (IS_NULL(chunk_writer)) continue;
+                    if (RET_FAIL(write_column_batch(
+                            chunk_writer, tablet, c, start_idx,
+                            device_id_end_index_pair.second))) {
+                        return ret;
+                    }
                 }
             }
             start_idx = device_id_end_index_pair.second;
@@ -1226,6 +1292,141 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* 
value_chunk_writer,
     return ret;
 }
 
+int TsFileWriter::time_write_column_batch(TimeChunkWriter* time_chunk_writer,
+                                          const Tablet& tablet,
+                                          uint32_t start_idx,
+                                          uint32_t end_idx) {
+    int64_t* timestamps = tablet.timestamps_;
+    int ret = E_OK;
+    if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) {
+        return E_INVALID_ARG;
+    }
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+    return time_chunk_writer->write_batch(timestamps + start_idx, count);
+}
+
+int TsFileWriter::write_column_batch(ChunkWriter* chunk_writer,
+                                      const Tablet& tablet, int col_idx,
+                                      uint32_t start_idx, uint32_t end_idx) {
+    int ret = E_OK;
+    common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
+    int64_t* timestamps = tablet.timestamps_;
+    Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
+    BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx];
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+
+    // For non-aligned write, bitmap bit=0 means not null.
+    // We need to check if there are any nulls.
+    bool has_null = false;
+    for (uint32_t r = start_idx; r < end_idx; r++) {
+        if (col_notnull_bitmap.test(r)) {
+            has_null = true;
+            break;
+        }
+    }
+
+    if (!has_null) {
+        // Fast path: no nulls, batch write directly
+        switch (data_type) {
+            case common::BOOLEAN:
+                ret = chunk_writer->write_batch(timestamps + start_idx,
+                                                 col_values.bool_data + 
start_idx,
+                                                 count);
+                break;
+            case common::INT32:
+            case common::DATE:
+                ret = chunk_writer->write_batch(timestamps + start_idx,
+                                                 col_values.int32_data + 
start_idx,
+                                                 count);
+                break;
+            case common::INT64:
+            case common::TIMESTAMP:
+                ret = chunk_writer->write_batch(timestamps + start_idx,
+                                                 col_values.int64_data + 
start_idx,
+                                                 count);
+                break;
+            case common::FLOAT:
+                ret = chunk_writer->write_batch(timestamps + start_idx,
+                                                 col_values.float_data + 
start_idx,
+                                                 count);
+                break;
+            case common::DOUBLE:
+                ret = chunk_writer->write_batch(timestamps + start_idx,
+                                                 col_values.double_data + 
start_idx,
+                                                 count);
+                break;
+            default:
+                // Fall back to per-row for STRING and other types
+                ret = write_column(chunk_writer, tablet, col_idx, start_idx,
+                                   end_idx);
+                break;
+        }
+    } else {
+        // Slow path: has nulls, fall back to per-row
+        ret = write_column(chunk_writer, tablet, col_idx, start_idx, end_idx);
+    }
+    return ret;
+}
+
+int TsFileWriter::value_write_column_batch(ValueChunkWriter* 
value_chunk_writer,
+                                            const Tablet& tablet, int col_idx,
+                                            uint32_t start_idx,
+                                            uint32_t end_idx) {
+    int ret = E_OK;
+    common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
+    int64_t* timestamps = tablet.timestamps_;
+    Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
+    BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx];
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+
+    switch (data_type) {
+        case common::BOOLEAN:
+            ret = value_chunk_writer->write_batch(
+                timestamps, col_values.bool_data, col_notnull_bitmap,
+                start_idx, count);
+            break;
+        case common::DATE:
+        case common::INT32:
+            ret = value_chunk_writer->write_batch(
+                timestamps, col_values.int32_data, col_notnull_bitmap,
+                start_idx, count);
+            break;
+        case common::TIMESTAMP:
+        case common::INT64:
+            ret = value_chunk_writer->write_batch(
+                timestamps, col_values.int64_data, col_notnull_bitmap,
+                start_idx, count);
+            break;
+        case common::FLOAT:
+            ret = value_chunk_writer->write_batch(
+                timestamps, col_values.float_data, col_notnull_bitmap,
+                start_idx, count);
+            break;
+        case common::DOUBLE:
+            ret = value_chunk_writer->write_batch(
+                timestamps, col_values.double_data, col_notnull_bitmap,
+                start_idx, count);
+            break;
+        case common::STRING:
+        case common::TEXT:
+        case common::BLOB:
+            // String types: fall back to per-row for now
+            ret = value_write_column(value_chunk_writer, tablet, col_idx,
+                                     start_idx, end_idx);
+            break;
+        default:
+            ret = E_NOT_SUPPORT;
+            break;
+    }
+    return ret;
+}
+
 // TODO make sure ret is meaningful to SDK user
 int TsFileWriter::flush() {
     int ret = E_OK;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 5e18ea2a9..19a7c7a52 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -22,6 +22,7 @@
 #include <fcntl.h>
 
 #include <climits>
+#include <future>
 #include <map>
 #include <memory>
 #include <string>
@@ -32,6 +33,7 @@
 #include "common/record.h"
 #include "common/schema.h"
 #include "common/tablet.h"
+#include "common/thread_pool.h"
 
 namespace storage {
 class WriteFile;
@@ -187,6 +189,7 @@ class TsFileWriter {
     bool write_file_created_;
     bool io_writer_owned_;  // false when init(RestorableTsFileIOWriter*)
     bool table_aligned_ = true;
+    common::ThreadPool thread_pool_{6};
 
     int write_typed_column(ValueChunkWriter* value_chunk_writer,
                            int64_t* timestamps, bool* col_values,
@@ -221,6 +224,16 @@ class TsFileWriter {
     int value_write_column(ValueChunkWriter* value_chunk_writer,
                            const Tablet& tablet, int col_idx,
                            uint32_t start_idx, uint32_t end_idx);
+
+    int write_column_batch(storage::ChunkWriter* chunk_writer,
+                           const Tablet& tablet, int col_idx,
+                           uint32_t start_idx, uint32_t end_idx);
+    int time_write_column_batch(TimeChunkWriter* time_chunk_writer,
+                                const Tablet& tablet, uint32_t start_idx,
+                                uint32_t end_idx);
+    int value_write_column_batch(ValueChunkWriter* value_chunk_writer,
+                                 const Tablet& tablet, int col_idx,
+                                 uint32_t start_idx, uint32_t end_idx);
 };
 
 }  // end namespace storage
diff --git a/cpp/src/writer/value_chunk_writer.h 
b/cpp/src/writer/value_chunk_writer.h
index 859fb57b0..afe94760c 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -109,6 +109,37 @@ class ValueChunkWriter {
         VCW_DO_WRITE_FOR_TYPE(isnull);
     }
 
+    template <typename T>
+    int write_batch(const int64_t* timestamps, const T* values,
+                    const common::BitMap& col_notnull_bitmap,
+                    uint32_t start_idx, uint32_t count) {
+        int ret = common::E_OK;
+        uint32_t offset = 0;
+        while (offset < count) {
+            uint32_t cur_points = value_page_writer_.get_point_numer();
+            uint32_t page_remaining =
+                common::g_config_value_.page_writer_max_point_num_ - 
cur_points;
+            if (page_remaining == 0) {
+                if (RET_FAIL(seal_cur_page(false))) {
+                    return ret;
+                }
+                page_remaining =
+                    common::g_config_value_.page_writer_max_point_num_;
+            }
+            uint32_t batch_size = std::min(count - offset, page_remaining);
+            if (RET_FAIL(value_page_writer_.write_batch(
+                    timestamps, values, col_notnull_bitmap,
+                    start_idx + offset, batch_size))) {
+                return ret;
+            }
+            offset += batch_size;
+            if (RET_FAIL(seal_cur_page_if_full())) {
+                return ret;
+            }
+        }
+        return ret;
+    }
+
     int end_encode_chunk();
     common::ByteStream& get_chunk_data() { return chunk_data_; }
     Statistic* get_chunk_statistic() { return chunk_statistic_; }
diff --git a/cpp/src/writer/value_page_writer.h 
b/cpp/src/writer/value_page_writer.h
index 60d75b0b8..d229bfd57 100644
--- a/cpp/src/writer/value_page_writer.h
+++ b/cpp/src/writer/value_page_writer.h
@@ -151,6 +151,63 @@ class ValuePageWriter {
         VPW_DO_WRITE_FOR_TYPE(isnull);
     }
 
+    // Batch write for aligned/table model.
+    // In the tablet bitmap: bit=1 means null, bit=0 means not null.
+    // In VPW_DO_WRITE_FOR_TYPE: ISNULL=true skips encoding.
+    // So: tablet bitmap.test(r)=true -> isnull=true (null value)
+    //     tablet bitmap.test(r)=false -> isnull=false (valid value)
+    template <typename T>
+    int write_batch(const int64_t* timestamps, const T* values,
+                    const common::BitMap& col_notnull_bitmap,
+                    uint32_t start_idx, uint32_t count) {
+        int ret = common::E_OK;
+        if (count == 0) return ret;
+
+        uint32_t valid_count = 0;
+        for (uint32_t i = 0; i < count; i++) {
+            uint32_t row = start_idx + i;
+            if ((size_ / 8) + 1 > col_notnull_bitmap_.size()) {
+                col_notnull_bitmap_.push_back(0);
+            }
+            // bit=1 in tablet bitmap means null; bit=0 means not null
+            bool is_null =
+                const_cast<common::BitMap&>(col_notnull_bitmap).test(row);
+            if (!is_null) {
+                // Mark as not-null in page bitmap
+                col_notnull_bitmap_[size_ / 8] |= (MASK >> (size_ % 8));
+                valid_count++;
+            }
+            size_++;
+        }
+
+        if (valid_count == 0) return ret;
+
+        // If all values are valid, we can encode the batch directly
+        if (valid_count == count) {
+            if (RET_FAIL(value_encoder_->encode_batch(values + start_idx,
+                                                       count,
+                                                       value_out_stream_))) {
+                return ret;
+            }
+            statistic_->update_batch(timestamps + start_idx,
+                                     values + start_idx, count);
+        } else {
+            // Encode only non-null values one by one
+            for (uint32_t i = 0; i < count; i++) {
+                uint32_t row = start_idx + i;
+                if (!const_cast<common::BitMap&>(col_notnull_bitmap)
+                         .test(row)) {
+                    if (RET_FAIL(value_encoder_->encode(values[row],
+                                                         value_out_stream_))) {
+                        return ret;
+                    }
+                    statistic_->update(timestamps[row], values[row]);
+                }
+            }
+        }
+        return ret;
+    }
+
     FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_; 
}
     FORCE_INLINE uint32_t get_col_notnull_bitmap_out_stream_size() const {
         return col_notnull_bitmap_out_stream_.total_size();
@@ -199,7 +256,7 @@ class ValuePageWriter {
                           common::ByteStream& pages_data);
 
    private:
-    static const uint32_t OUT_STREAM_PAGE_SIZE = 1024;
+    static const uint32_t OUT_STREAM_PAGE_SIZE = 65536;
 
    private:
     common::TSDataType data_type_;

Reply via email to