This is an automated email from the ASF dual-hosted git repository.

colinlee pushed a commit to branch write_with_parallel
in repository https://gitbox.apache.org/repos/asf/tsfile.git

commit 38b7336e0563d6526c3322740d1be31800d8e60e
Author: ColinLee <[email protected]>
AuthorDate: Thu Apr 9 21:03:22 2026 +0800

    support write parallel.
---
 cpp/CMakeLists.txt                                 |   7 +
 cpp/src/common/config/config.h                     |   2 +
 cpp/src/common/global.cc                           |   3 +
 cpp/src/common/global.h                            |  14 +
 cpp/src/common/tablet.cc                           | 131 ++++++++
 cpp/src/common/tablet.h                            |   5 +
 cpp/src/common/thread_pool.h                       | 122 +++++++
 cpp/src/writer/tsfile_writer.cc                    | 373 +++++++--------------
 cpp/src/writer/tsfile_writer.h                     |   9 +-
 .../writer/table_view/tsfile_writer_table_test.cc  |  45 +--
 10 files changed, 436 insertions(+), 275 deletions(-)

diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 9d2015e4..cc50a178 100755
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -150,6 +150,13 @@ if (ENABLE_ZLIB)
     add_definitions(-DENABLE_GZIP)
 endif()
 
+option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" 
ON)
+message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}")
+
+if (ENABLE_THREADS)
+    add_definitions(-DENABLE_THREADS)
+endif()
+
 option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF)
 message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}")
 
diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h
index 81dad924..e2b2039a 100644
--- a/cpp/src/common/config/config.h
+++ b/cpp/src/common/config/config.h
@@ -46,6 +46,8 @@ typedef struct ConfigValue {
     TSEncoding double_encoding_type_;
     TSEncoding string_encoding_type_;
     CompressionType default_compression_type_;
+    bool parallel_write_enabled_;
+    int32_t write_thread_count_;
     // When true, aligned writer enforces page size limit strictly by
     // interleaving time/value writes and sealing pages together when any side
     // becomes full.
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 91ecedda..721522c7 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -54,12 +54,15 @@ void init_config_value() {
     g_config_value_.int64_encoding_type_ = TS_2DIFF;
     g_config_value_.float_encoding_type_ = GORILLA;
     g_config_value_.double_encoding_type_ = GORILLA;
+    g_config_value_.string_encoding_type_ = PLAIN;
     // Default compression type is LZ4
 #ifdef ENABLE_LZ4
     g_config_value_.default_compression_type_ = LZ4;
 #else
     g_config_value_.default_compression_type_ = UNCOMPRESSED;
 #endif
+    g_config_value_.parallel_write_enabled_ = true;
+    g_config_value_.write_thread_count_ = 6;
     // Enforce aligned page size limits strictly by default.
     g_config_value_.strict_page_size_ = true;
 }
diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h
index 7937e920..01be5a9c 100644
--- a/cpp/src/common/global.h
+++ b/cpp/src/common/global.h
@@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() {
     return static_cast<uint8_t>(g_config_value_.default_compression_type_);
 }
 
+FORCE_INLINE void set_parallel_write_enabled(bool enabled) {
+    g_config_value_.parallel_write_enabled_ = enabled;
+}
+
+FORCE_INLINE bool get_parallel_write_enabled() {
+    return g_config_value_.parallel_write_enabled_;
+}
+
+FORCE_INLINE int set_write_thread_count(int32_t count) {
+    if (count < 1 || count > 64) return E_INVALID_ARG;
+    g_config_value_.write_thread_count_ = count;
+    return E_OK;
+}
+
 extern int init_common();
 extern bool is_timestamp_column_name(const char* time_col_name);
 extern void cols_to_json(ByteStream* byte_stream,
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index 4088a692..6233abd7 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -19,7 +19,9 @@
 
 #include "tablet.h"
 
+#include <algorithm>
 #include <cstdlib>
+#include <numeric>
 
 #include "allocator/alloc_base.h"
 #include "datatype/date_converter.h"
@@ -491,6 +493,135 @@ void Tablet::set_column_categories(
     }
 }
 
+namespace {
+
+template <typename T>
+void permute_array(T* arr, const std::vector<uint32_t>& perm, uint32_t n) {
+    std::vector<T> tmp(n);
+    for (uint32_t i = 0; i < n; i++) tmp[i] = arr[perm[i]];
+    std::copy(tmp.begin(), tmp.end(), arr);
+}
+
+void permute_string_column(Tablet::StringColumn* sc, BitMap& bm,
+                           const std::vector<uint32_t>& perm, uint32_t n,
+                           uint32_t max_rows) {
+    Tablet::StringColumn tmp;
+    tmp.init(max_rows, sc->buf_used > 0 ? sc->buf_used : 64);
+    for (uint32_t i = 0; i < n; i++) {
+        uint32_t r = perm[i];
+        if (bm.test(r)) {
+            // Null row — write a zero-length entry to keep offsets valid.
+            tmp.append(i, "", 0);
+        } else {
+            int32_t off = sc->offsets[r];
+            uint32_t len =
+                static_cast<uint32_t>(sc->offsets[r + 1] - off);
+            tmp.append(i, sc->buffer + off, len);
+        }
+    }
+    // Swap contents.
+    std::swap(sc->offsets, tmp.offsets);
+    std::swap(sc->buffer, tmp.buffer);
+    std::swap(sc->buf_capacity, tmp.buf_capacity);
+    std::swap(sc->buf_used, tmp.buf_used);
+    tmp.destroy();
+}
+
+void permute_bitmap(BitMap& bm, const std::vector<uint32_t>& perm,
+                    uint32_t n) {
+    if (!bm.get_bitmap()) return;
+    uint32_t size_bytes = bm.get_size();
+    // Save original bits.
+    std::vector<char> orig(bm.get_bitmap(), bm.get_bitmap() + size_bytes);
+    // Clear all bits (= all non-null).
+    bm.clear_all();
+    // Re-set null bits through the permutation.
+    for (uint32_t i = 0; i < n; i++) {
+        uint32_t src = perm[i];
+        if (orig[src >> 3] & (1 << (src & 7))) {
+            bm.set(i);
+        }
+    }
+}
+
+}  // anonymous namespace
+
+void Tablet::sort_by_device() {
+    if (id_column_indexes_.empty() || cur_row_size_ <= 1) return;
+
+    const uint32_t n = cur_row_size_;
+
+    // Build permutation sorted by tag column values (stable sort keeps
+    // timestamp order within each device).
+    std::vector<uint32_t> perm(n);
+    std::iota(perm.begin(), perm.end(), 0);
+
+    std::stable_sort(perm.begin(), perm.end(), [this](uint32_t a, uint32_t b) {
+        for (int idx : id_column_indexes_) {
+            bool a_null = bitmaps_[idx].test(a);
+            bool b_null = bitmaps_[idx].test(b);
+            if (a_null != b_null) return a_null > b_null;  // null sorts last
+            if (a_null) continue;  // both null — equal on this column
+            const StringColumn& sc = *value_matrix_[idx].string_col;
+            int32_t a_off = sc.offsets[a];
+            uint32_t a_len = static_cast<uint32_t>(sc.offsets[a + 1] - a_off);
+            int32_t b_off = sc.offsets[b];
+            uint32_t b_len = static_cast<uint32_t>(sc.offsets[b + 1] - b_off);
+            uint32_t min_len = std::min(a_len, b_len);
+            int cmp = (min_len > 0)
+                          ? memcmp(sc.buffer + a_off, sc.buffer + b_off, 
min_len)
+                          : 0;
+            if (cmp != 0) return cmp < 0;
+            if (a_len != b_len) return a_len < b_len;
+        }
+        return false;
+    });
+
+    // Check if already sorted.
+    bool sorted = true;
+    for (uint32_t i = 0; i < n && sorted; i++) {
+        if (perm[i] != i) sorted = false;
+    }
+    if (sorted) return;
+
+    // Apply permutation to timestamps.
+    permute_array(timestamps_, perm, n);
+
+    // Apply permutation to each column.
+    uint32_t col_count = static_cast<uint32_t>(schema_vec_->size());
+    for (uint32_t c = 0; c < col_count; c++) {
+        TSDataType dt = schema_vec_->at(c).data_type_;
+        switch (dt) {
+            case BOOLEAN:
+                permute_array(value_matrix_[c].bool_data, perm, n);
+                break;
+            case INT32:
+            case DATE:
+                permute_array(value_matrix_[c].int32_data, perm, n);
+                break;
+            case INT64:
+            case TIMESTAMP:
+                permute_array(value_matrix_[c].int64_data, perm, n);
+                break;
+            case FLOAT:
+                permute_array(value_matrix_[c].float_data, perm, n);
+                break;
+            case DOUBLE:
+                permute_array(value_matrix_[c].double_data, perm, n);
+                break;
+            case STRING:
+            case TEXT:
+            case BLOB:
+                permute_string_column(value_matrix_[c].string_col, bitmaps_[c],
+                                      perm, n, max_row_num_);
+                break;
+            default:
+                break;
+        }
+        permute_bitmap(bitmaps_[c], perm, n);
+    }
+}
+
 void Tablet::reset_string_columns() {
     size_t schema_count = schema_vec_->size();
     for (size_t c = 0; c < schema_count; c++) {
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index beedacc0..a6bebae9 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -46,6 +46,7 @@ class TabletColIterator;
  * with their associated metadata such as column names and types.
  */
 class Tablet {
+   public:
     // Arrow-style string column: offsets + contiguous buffer.
     // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
     struct StringColumn {
@@ -284,6 +285,10 @@ class Tablet {
 
     void set_column_categories(
         const std::vector<common::ColumnCategory>& column_categories);
+    // Sort rows so that rows belonging to the same device (same TAG column
+    // values) are contiguous.  Stable sort: preserves timestamp order within
+    // each device.  No-op when there are no TAG columns or ≤1 rows.
+    void sort_by_device();
     std::shared_ptr<IDeviceID> get_device_id(int i) const;
     std::vector<uint32_t> find_all_device_boundaries() const;
 
diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h
new file mode 100644
index 00000000..9285d4ff
--- /dev/null
+++ b/cpp/src/common/thread_pool.h
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef COMMON_THREAD_POOL_H
+#define COMMON_THREAD_POOL_H
+
+#ifdef ENABLE_THREADS
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <vector>
+
+namespace common {
+
+// Unified fixed-size thread pool supporting both fire-and-forget tasks
+// (submit void + wait_all) and future-returning tasks (submit<F>).
+// Used by both write path (column-parallel encoding) and read path
+// (column-parallel decoding).
+class ThreadPool {
+   public:
+    explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) {
+        for (size_t i = 0; i < num_threads; i++) {
+            workers_.emplace_back([this] { worker_loop(); });
+        }
+    }
+
+    ~ThreadPool() {
+        {
+            std::lock_guard<std::mutex> lk(mu_);
+            stop_ = true;
+        }
+        cv_work_.notify_all();
+        for (auto& w : workers_) {
+            if (w.joinable()) w.join();
+        }
+    }
+
+    // Submit a fire-and-forget task (no return value).
+    void submit(std::function<void()> task) {
+        {
+            std::lock_guard<std::mutex> lk(mu_);
+            tasks_.push(std::move(task));
+            active_++;
+        }
+        cv_work_.notify_one();
+    }
+
+    // Submit a task that returns a value via std::future.
+    template <typename F>
+    std::future<typename std::result_of<F()>::type> submit(F&& f) {
+        using RetType = typename std::result_of<F()>::type;
+        auto task =
+            
std::make_shared<std::packaged_task<RetType()>>(std::forward<F>(f));
+        std::future<RetType> result = task->get_future();
+        {
+            std::lock_guard<std::mutex> lk(mu_);
+            tasks_.emplace([task]() { (*task)(); });
+            active_++;
+        }
+        cv_work_.notify_one();
+        return result;
+    }
+
+    // Block until all submitted tasks have completed.
+    void wait_all() {
+        std::unique_lock<std::mutex> lk(mu_);
+        cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); });
+    }
+
+   private:
+    void worker_loop() {
+        while (true) {
+            std::function<void()> task;
+            {
+                std::unique_lock<std::mutex> lk(mu_);
+                cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); });
+                if (stop_ && tasks_.empty()) return;
+                task = std::move(tasks_.front());
+                tasks_.pop();
+            }
+            task();
+            {
+                std::lock_guard<std::mutex> lk(mu_);
+                active_--;
+            }
+            cv_done_.notify_one();
+        }
+    }
+
+    std::vector<std::thread> workers_;
+    std::queue<std::function<void()>> tasks_;
+    std::mutex mu_;
+    std::condition_variable cv_work_;
+    std::condition_variable cv_done_;
+    bool stop_;
+    int active_;
+};
+
+}  // namespace common
+
+#endif  // ENABLE_THREADS
+
+#endif  // COMMON_THREAD_POOL_H
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 657fcabc..c69915ba 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -1136,280 +1136,145 @@ int TsFileWriter::write_table(Tablet& tablet) {
         return ret;
     }
 
+    // Sort tablet so same-device rows are contiguous.
+    tablet.sort_by_device();
+
     auto device_id_end_index_pairs = split_tablet_by_device(tablet);
     int start_idx = 0;
     for (auto& device_id_end_index_pair : device_id_end_index_pairs) {
         auto device_id = device_id_end_index_pair.first;
         int end_idx = device_id_end_index_pair.second;
         if (end_idx == 0) continue;
-        if (table_aligned_) {
-            SimpleVector<ValueChunkWriter*> value_chunk_writers;
-            TimeChunkWriter* time_chunk_writer = nullptr;
-            if (RET_FAIL(do_check_schema_table(device_id, tablet,
-                                               time_chunk_writer,
-                                               value_chunk_writers))) {
-                return ret;
-            }
 
-            const bool strict_page_size =
-                common::g_config_value_.strict_page_size_;
-
-            std::vector<uint32_t> field_columns;
-            field_columns.reserve(tablet.get_column_count());
-            for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
-                if (tablet.column_categories_[col] ==
-                    common::ColumnCategory::FIELD) {
-                    field_columns.push_back(col);
-                }
-            }
-            ASSERT(field_columns.size() == value_chunk_writers.size());
-
-            const bool has_varlen_field_column = [&]() {
-                for (uint32_t i = 0; i < field_columns.size(); i++) {
-                    const common::TSDataType t =
-                        tablet.schema_vec_->at(field_columns[i]).data_type_;
-                    if (t == common::STRING || t == common::TEXT ||
-                        t == common::BLOB) {
-                        return true;
-                    }
-                }
-                return false;
-            }();
+        SimpleVector<ValueChunkWriter*> value_chunk_writers;
+        TimeChunkWriter* time_chunk_writer = nullptr;
+        if (RET_FAIL(do_check_schema_table(device_id, tablet,
+                                           time_chunk_writer,
+                                           value_chunk_writers))) {
+            return ret;
+        }
 
-            // Keep writers' seal-check behavior consistent across calls.
-            time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
-            for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                if (!IS_NULL(value_chunk_writers[c])) {
-                    value_chunk_writers[c]->set_enable_page_seal_if_full(
-                        strict_page_size);
-                }
+        std::vector<uint32_t> field_columns;
+        field_columns.reserve(tablet.get_column_count());
+        for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+            if (tablet.column_categories_[col] ==
+                common::ColumnCategory::FIELD) {
+                field_columns.push_back(col);
             }
+        }
+        ASSERT(field_columns.size() == value_chunk_writers.size());
 
-            if (strict_page_size) {
-                // Strict: row-based insertion and force aligned page sealing
-                // when either time or any value page becomes full.
-                for (int i = start_idx; i < end_idx; i++) {
-                    int32_t time_pages_before =
-                        time_chunk_writer->num_of_pages();
-                    std::vector<int32_t> value_pages_before(
-                        value_chunk_writers.size(), 0);
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        if (!IS_NULL(value_chunk_writers[k])) {
-                            value_pages_before[k] =
-                                value_chunk_writers[k]->num_of_pages();
-                        }
-                    }
-
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[i]))) {
-                        return ret;
-                    }
-
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        ValueChunkWriter* value_chunk_writer =
-                            value_chunk_writers[k];
-                        if (IS_NULL(value_chunk_writer)) {
-                            continue;
-                        }
-                        const uint32_t tablet_col_idx = field_columns[k];
-                        if (RET_FAIL(value_write_column(value_chunk_writer,
-                                                        tablet, tablet_col_idx,
-                                                        i, i + 1))) {
-                            return ret;
-                        }
-                    }
-
-                    if (RET_FAIL(maybe_seal_aligned_pages_together(
-                            time_chunk_writer, value_chunk_writers,
-                            time_pages_before, value_pages_before))) {
-                        return ret;
-                    }
-                }
-            } else if (!has_varlen_field_column) {
-                // Optimization: no string/blob/text columns, so we can
-                // segment by point-number and seal pages at those boundaries
-                // in column-based order.
-                const uint32_t points_per_page =
-                    common::g_config_value_.page_writer_max_point_num_;
-
-                time_chunk_writer->set_enable_page_seal_if_full(false);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
-
-                // Fill the already-unsealed time page first.
-                uint32_t time_cur_points = 
time_chunk_writer->get_point_numer();
-                if (time_cur_points >= points_per_page &&
-                    time_chunk_writer->has_current_page_data()) {
-                    if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                        return ret;
-                    }
-                    for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                        if (!IS_NULL(value_chunk_writers[c]) &&
-                            value_chunk_writers[c]->has_current_page_data()) {
-                            if (RET_FAIL(value_chunk_writers[c]
-                                             ->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                    }
-                    time_cur_points = 0;
-                }
-
-                const uint32_t first_seg_len =
-                    (time_cur_points > 0 && time_cur_points < points_per_page)
-                        ? (points_per_page - time_cur_points)
-                        : points_per_page;
-
-                // 1) Write time in segments (seal all full segments).
-                uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                uint32_t seg_len = first_seg_len;
-                while (static_cast<int>(seg_start) < end_idx) {
-                    const uint32_t seg_end = std::min(
-                        seg_start + seg_len, static_cast<uint32_t>(end_idx));
-                    if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
-                                                   seg_start, seg_end))) {
-                        return ret;
-                    }
-                    seg_start = seg_end;
-                    if (static_cast<int>(seg_start) < end_idx) {
-                        if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                            return ret;
-                        }
-                    }
-                    seg_len = points_per_page;
-                }
-
-                // 2) Write each value column (same segments).
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    seg_start = static_cast<uint32_t>(start_idx);
-                    seg_len = first_seg_len;
-                    while (static_cast<int>(seg_start) < end_idx) {
-                        const uint32_t seg_end =
-                            std::min(seg_start + seg_len,
-                                     static_cast<uint32_t>(end_idx));
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, seg_end))) {
-                            return ret;
-                        }
-                        seg_start = seg_end;
-                        if (static_cast<int>(seg_start) < end_idx) {
-                            if (value_chunk_writer->has_current_page_data() &&
-                                RET_FAIL(
-                                    value_chunk_writer->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                        seg_len = points_per_page;
-                    }
-                }
-            } else {
-                // General non-strict (may have varlen STRING/TEXT/BLOB
-                // columns): time auto-seals to provide aligned page 
boundaries;
-                // value writers skip auto page sealing and are sealed manually
-                // at recorded time-page boundaries. Attention: since 
value-side
-                // auto-seal is disabled, if a varlen value page hits the 
memory
-                // threshold earlier, it may not seal immediately and will be
-                // sealed later at the time-page boundaries (non-strict
-                // sacrifices the strict page size/memory limit for
-                // performance).
-                time_chunk_writer->set_enable_page_seal_if_full(true);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
+        // Precompute page boundaries from point counts — no serial write
+        // needed.  The first segment may be shorter if the time page already
+        // holds data from a previous write_table call.
+        const uint32_t page_max_points = std::max<uint32_t>(
+            1, common::g_config_value_.page_writer_max_point_num_);
+        const uint32_t si = static_cast<uint32_t>(start_idx);
+        const uint32_t ei = static_cast<uint32_t>(end_idx);
 
-                std::vector<uint32_t> time_page_row_ends;
-                const uint32_t page_max_points = std::max<uint32_t>(
-                    1, common::g_config_value_.page_writer_max_point_num_);
-                const uint32_t batch_rows =
-                    static_cast<uint32_t>(end_idx - start_idx);
-                time_page_row_ends.reserve(batch_rows / page_max_points + 1);
-                for (uint32_t r = static_cast<uint32_t>(start_idx);
-                     r < static_cast<uint32_t>(end_idx); r++) {
-                    const int32_t pages_before =
-                        time_chunk_writer->num_of_pages();
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[r]))) {
-                        return ret;
-                    }
-                    const int32_t pages_after =
-                        time_chunk_writer->num_of_pages();
-                    if (pages_after > pages_before) {
-                        const uint32_t boundary_end = r + 1;
-                        if (time_page_row_ends.empty() ||
-                            time_page_row_ends.back() != boundary_end) {
-                            time_page_row_ends.push_back(boundary_end);
-                        }
-                    }
+        uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+        const uint32_t first_seg_cap =
+            (time_cur_points > 0 && time_cur_points < page_max_points)
+                ? (page_max_points - time_cur_points)
+                : page_max_points;
+
+        std::vector<uint32_t> page_boundaries;  // row indices where a page
+                                                 // should seal
+        {
+            uint32_t pos = si;
+            uint32_t seg_cap = first_seg_cap;
+            while (pos < ei) {
+                uint32_t seg_end = std::min(pos + seg_cap, ei);
+                if (seg_end < ei) {
+                    page_boundaries.push_back(seg_end);
                 }
+                pos = seg_end;
+                seg_cap = page_max_points;
+            }
+        }
 
-                // Write values column-by-column and seal at recorded time
-                // boundaries.
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                    for (uint32_t boundary_end : time_page_row_ends) {
-                        if (boundary_end <= seg_start) {
-                            continue;
-                        }
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, boundary_end))) {
-                            return ret;
-                        }
-                        if (value_chunk_writer->has_current_page_data() &&
-                            RET_FAIL(value_chunk_writer->seal_current_page())) 
{
-                            return ret;
-                        }
-                        seg_start = boundary_end;
-                    }
-                    if (seg_start < static_cast<uint32_t>(end_idx)) {
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, static_cast<uint32_t>(end_idx)))) {
-                            return ret;
-                        }
-                    }
-                }
+        // Write one column in segments defined by page_boundaries, sealing
+        // at each boundary.  Works for both time and value columns.
+        auto write_time_in_segments =
+            [this, &tablet, &page_boundaries, si, ei](
+                TimeChunkWriter* tcw) -> int {
+            int r = E_OK;
+            tcw->set_enable_page_seal_if_full(false);
+            uint32_t seg_start = si;
+            for (uint32_t boundary : page_boundaries) {
+                if ((r = time_write_column(tcw, tablet, seg_start,
+                                           boundary)) != E_OK)
+                    return r;
+                if ((r = tcw->seal_current_page()) != E_OK) return r;
+                seg_start = boundary;
             }
-            start_idx = end_idx;
-        } else {
-            MeasurementNamesFromTablet mnames_getter(tablet);
-            SimpleVector<ChunkWriter*> chunk_writers;
-            SimpleVector<common::TSDataType> data_types;
-            if (RET_FAIL(do_check_schema(device_id, mnames_getter,
-                                         chunk_writers, data_types))) {
+            if (seg_start < ei) {
+                r = time_write_column(tcw, tablet, seg_start, ei);
+            }
+            tcw->set_enable_page_seal_if_full(true);
+            return r;
+        };
+
+        auto write_value_in_segments =
+            [this, &tablet, &page_boundaries, si, ei](
+                ValueChunkWriter* vcw, uint32_t col_idx) -> int {
+            int r = E_OK;
+            vcw->set_enable_page_seal_if_full(false);
+            uint32_t seg_start = si;
+            for (uint32_t boundary : page_boundaries) {
+                if ((r = value_write_column(vcw, tablet, col_idx, seg_start,
+                                            boundary)) != E_OK)
+                    return r;
+                if (vcw->has_current_page_data() &&
+                    (r = vcw->seal_current_page()) != E_OK)
+                    return r;
+                seg_start = boundary;
+            }
+            if (seg_start < ei) {
+                r = value_write_column(vcw, tablet, col_idx, seg_start, ei);
+            }
+            vcw->set_enable_page_seal_if_full(true);
+            return r;
+        };
+
+        // All columns (time + values) write the same row segments and seal
+        // at the same boundaries — fully parallel.
+#ifdef ENABLE_THREADS
+        if (g_config_value_.parallel_write_enabled_) {
+            std::vector<std::future<int>> futures;
+            futures.push_back(thread_pool_.submit(
+                [&write_time_in_segments, time_chunk_writer]() {
+                    return write_time_in_segments(time_chunk_writer);
+                }));
+            for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+                ValueChunkWriter* vcw = value_chunk_writers[k];
+                if (IS_NULL(vcw)) continue;
+                uint32_t col_idx = field_columns[k];
+                futures.push_back(thread_pool_.submit(
+                    [&write_value_in_segments, vcw, col_idx]() {
+                        return write_value_in_segments(vcw, col_idx);
+                    }));
+            }
+            for (auto& f : futures) {
+                int r = f.get();
+                if (r != E_OK && ret == E_OK) ret = r;
+            }
+            if (ret != E_OK) return ret;
+        } else
+#endif
+        {
+            if (RET_FAIL(write_time_in_segments(time_chunk_writer))) {
                 return ret;
             }
-            ASSERT(chunk_writers.size() == tablet.get_column_count());
-            for (uint32_t c = 0; c < chunk_writers.size(); c++) {
-                ChunkWriter* chunk_writer = chunk_writers[c];
-                if (IS_NULL(chunk_writer)) {
-                    continue;
-                }
-                if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
-                                          device_id_end_index_pair.second))) {
+            for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+                ValueChunkWriter* vcw = value_chunk_writers[k];
+                if (IS_NULL(vcw)) continue;
+                if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) {
                     return ret;
                 }
             }
-            start_idx = device_id_end_index_pair.second;
         }
+        start_idx = end_idx;
     }
     record_count_since_last_flush_ += tablet.cur_row_size_;
     // Reset string column buffers so the tablet can be reused for the next
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 01028e2e..87646bc7 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -22,6 +22,7 @@
 #include <fcntl.h>
 
 #include <climits>
+#include <future>
 #include <map>
 #include <memory>
 #include <string>
@@ -32,6 +33,9 @@
 #include "common/record.h"
 #include "common/schema.h"
 #include "common/tablet.h"
+#ifdef ENABLE_THREADS
+#include "common/thread_pool.h"
+#endif
 
 namespace storage {
 class WriteFile;
@@ -194,7 +198,10 @@ class TsFileWriter {
     int64_t record_count_for_next_mem_check_;
     bool write_file_created_;
     bool io_writer_owned_;  // false when init(RestorableTsFileIOWriter*)
-    bool table_aligned_ = true;
+#ifdef ENABLE_THREADS
+    common::ThreadPool thread_pool_{
+        (size_t)common::g_config_value_.write_thread_count_};
+#endif
 
     int write_typed_column(ValueChunkWriter* value_chunk_writer,
                            int64_t* timestamps, bool* col_values,
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc 
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 477f875e..52f64736 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -26,15 +26,17 @@
 #include "file/tsfile_io_writer.h"
 #include "file/write_file.h"
 #include "reader/tsfile_reader.h"
+#include "common/global.h"
 #include "writer/chunk_writer.h"
 #include "writer/tsfile_table_writer.h"
 using namespace storage;
 using namespace common;
 
-class TsFileWriterTableTest : public ::testing::Test {
+class TsFileWriterTableTest : public ::testing::TestWithParam<bool> {
    protected:
     void SetUp() override {
         libtsfile_init();
+        g_config_value_.parallel_write_enabled_ = GetParam();
         file_name_ = std::string("tsfile_writer_table_test_") +
                      generate_random_string(10) + std::string(".tsfile");
         remove(file_name_.c_str());
@@ -133,7 +135,7 @@ class TsFileWriterTableTest : public ::testing::Test {
     }
 };
 
-TEST_F(TsFileWriterTableTest, WriteTableTest) {
+TEST_P(TsFileWriterTableTest, WriteTableTest) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -144,7 +146,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) {
+TEST_P(TsFileWriterTableTest, WithoutTagAndMultiPage) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     measurement_schemas.resize(1);
@@ -192,7 +194,7 @@ TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriteDisorderTest) {
+TEST_P(TsFileWriterTableTest, WriteDisorderTest) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -242,7 +244,7 @@ TEST_F(TsFileWriterTableTest, WriteDisorderTest) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {
+TEST_P(TsFileWriterTableTest, WriteTableTestMultiFlush) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
         &write_file_, table_schema, 2 * 1024);
@@ -255,7 +257,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) {
+TEST_P(TsFileWriterTableTest, WriteNonExistColumnTest) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -283,7 +285,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
+TEST_P(TsFileWriterTableTest, WriteNonExistTableTest) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -295,7 +297,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) {
+TEST_P(TsFileWriterTableTest, WriterWithMemoryThreshold) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
         &write_file_, table_schema, 256 * 1024 * 1024);
@@ -305,7 +307,7 @@ TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, EmptyTagWrite) {
+TEST_P(TsFileWriterTableTest, EmptyTagWrite) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     measurement_schemas.resize(3);
@@ -361,7 +363,7 @@ TEST_F(TsFileWriterTableTest, EmptyTagWrite) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
+TEST_P(TsFileWriterTableTest, WritehDataTypeMisMatch) {
     auto table_schema = gen_table_schema(0);
     auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
         &write_file_, table_schema, 256 * 1024 * 1024);
@@ -412,7 +414,7 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
     tsfile_table_writer_->close();
 }
 
-TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
+TEST_P(TsFileWriterTableTest, WriteAndReadSimple) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     measurement_schemas.resize(2);
@@ -467,7 +469,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
+TEST_P(TsFileWriterTableTest, DuplicateColumnName) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     measurement_schemas.resize(3);
@@ -505,7 +507,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
+TEST_P(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     for (int i = 0; i < 3; i++) {
@@ -637,7 +639,7 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
     ASSERT_EQ(reader.close(), common::E_OK);
 }
 
-TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
+TEST_P(TsFileWriterTableTest, MultiDeviceMultiFields) {
     common::config_set_max_degree_of_index_node(5);
     auto table_schema = gen_table_schema(0, 1, 100);
     auto tsfile_table_writer_ =
@@ -696,7 +698,7 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
     delete table_schema;
 }
 
-TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
+TEST_P(TsFileWriterTableTest, WriteDataWithEmptyField) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
     for (int i = 0; i < 3; i++) {
@@ -773,7 +775,7 @@ TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
     ASSERT_EQ(reader.close(), common::E_OK);
 }
 
-TEST_F(TsFileWriterTableTest, MultiDatatypes) {
+TEST_P(TsFileWriterTableTest, MultiDatatypes) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
 
@@ -877,7 +879,7 @@ TEST_F(TsFileWriterTableTest, MultiDatatypes) {
     delete[] literal;
 }
 
-TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
+TEST_P(TsFileWriterTableTest, DiffCodecTypes) {
     std::vector<MeasurementSchema*> measurement_schemas;
     std::vector<ColumnCategory> column_categories;
 
@@ -985,7 +987,7 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
     delete[] literal;
 }
 
-TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) {
+TEST_P(TsFileWriterTableTest, EncodingConfigIntegration) {
     // 1. Test setting global compression type
     ASSERT_EQ(E_OK, set_global_compression(SNAPPY));
 
@@ -1098,7 +1100,7 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) {
 }
 
 #ifdef ENABLE_MEM_STAT
-TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) {
+TEST_P(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) {
     TableSchema* table_schema = gen_table_schema(0, 2, 3);
     auto tsfile_table_writer =
         std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -1172,4 +1174,7 @@ TEST_F(TsFileWriterTableTest, 
DISABLED_MemStatWriteAndVerify) {
 
     delete table_schema;
 }
-#endif
\ No newline at end of file
+#endif
+
+INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest, 
::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest, 
::testing::Values(true));
\ No newline at end of file


Reply via email to