Copilot commented on code in PR #772:
URL: https://github.com/apache/tsfile/pull/772#discussion_r3061803563


##########
cpp/src/common/thread_pool.h:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef COMMON_THREAD_POOL_H
+#define COMMON_THREAD_POOL_H
+
+#ifdef ENABLE_THREADS
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <vector>
+
+namespace common {
+
+// Unified fixed-size thread pool supporting both fire-and-forget tasks
+// (submit void + wait_all) and future-returning tasks (submit<F>).
+// Used by both write path (column-parallel encoding) and read path
+// (column-parallel decoding).
+class ThreadPool {
+   public:
+    explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) {
+        for (size_t i = 0; i < num_threads; i++) {
+            workers_.emplace_back([this] { worker_loop(); });
+        }
+    }
+
+    ~ThreadPool() {
+        {
+            std::lock_guard<std::mutex> lk(mu_);
+            stop_ = true;
+        }
+        cv_work_.notify_all();
+        for (auto& w : workers_) {
+            if (w.joinable()) w.join();
+        }
+    }
+
+    // Submit a fire-and-forget task (no return value).
+    void submit(std::function<void()> task) {
+        {
+            std::lock_guard<std::mutex> lk(mu_);
+            tasks_.push(std::move(task));
+            active_++;
+        }
+        cv_work_.notify_one();
+    }
+
+    // Submit a task that returns a value via std::future.
+    template <typename F>
+    std::future<typename std::result_of<F()>::type> submit(F&& f) {
+        using RetType = typename std::result_of<F()>::type;
+        auto task =
+            
std::make_shared<std::packaged_task<RetType()>>(std::forward<F>(f));
+        std::future<RetType> result = task->get_future();
+        {
+            std::lock_guard<std::mutex> lk(mu_);
+            tasks_.emplace([task]() { (*task)(); });
+            active_++;
+        }
+        cv_work_.notify_one();
+        return result;
+    }
+
+    // Block until all submitted tasks have completed.
+    void wait_all() {
+        std::unique_lock<std::mutex> lk(mu_);
+        cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); });
+    }
+
+   private:
+    void worker_loop() {
+        while (true) {
+            std::function<void()> task;
+            {
+                std::unique_lock<std::mutex> lk(mu_);
+                cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); });
+                if (stop_ && tasks_.empty()) return;
+                task = std::move(tasks_.front());
+                tasks_.pop();
+            }
+            task();
+            {
+                std::lock_guard<std::mutex> lk(mu_);
+                active_--;
+            }
+            cv_done_.notify_one();
+        }

Review Comment:
   `worker_loop()` executes `task()` without any exception handling. If a task 
throws, the worker thread will terminate and `active_` will never be 
decremented, causing `wait_all()` (and any code waiting on futures) to 
potentially block forever. Wrap task execution in a `try/catch` that ensures 
`active_` is decremented and optionally stores/logs the exception.



##########
cpp/src/common/thread_pool.h:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef COMMON_THREAD_POOL_H
+#define COMMON_THREAD_POOL_H
+
+#ifdef ENABLE_THREADS
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <thread>

Review Comment:
   `ThreadPool` uses `std::result_of` in the `submit(F&&)` template but the 
header does not include `<type_traits>`, where `std::result_of` is defined. 
This makes the header non-self-contained and can fail to compile depending on 
transitive includes. Add the missing include (or switch to `std::invoke_result` 
if/when the project moves off C++11).
   ```suggestion
   #include <thread>
   #include <type_traits>
   ```



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -1142,274 +1142,138 @@ int TsFileWriter::write_table(Tablet& tablet) {
         auto device_id = device_id_end_index_pair.first;
         int end_idx = device_id_end_index_pair.second;
         if (end_idx == 0) continue;
-        if (table_aligned_) {
-            SimpleVector<ValueChunkWriter*> value_chunk_writers;
-            TimeChunkWriter* time_chunk_writer = nullptr;
-            if (RET_FAIL(do_check_schema_table(device_id, tablet,
-                                               time_chunk_writer,
-                                               value_chunk_writers))) {
-                return ret;
-            }
-
-            const bool strict_page_size =
-                common::g_config_value_.strict_page_size_;
 
-            std::vector<uint32_t> field_columns;
-            field_columns.reserve(tablet.get_column_count());
-            for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
-                if (tablet.column_categories_[col] ==
-                    common::ColumnCategory::FIELD) {
-                    field_columns.push_back(col);
-                }
-            }
-            ASSERT(field_columns.size() == value_chunk_writers.size());
-
-            const bool has_varlen_field_column = [&]() {
-                for (uint32_t i = 0; i < field_columns.size(); i++) {
-                    const common::TSDataType t =
-                        tablet.schema_vec_->at(field_columns[i]).data_type_;
-                    if (t == common::STRING || t == common::TEXT ||
-                        t == common::BLOB) {
-                        return true;
-                    }
-                }
-                return false;
-            }();
+        SimpleVector<ValueChunkWriter*> value_chunk_writers;
+        TimeChunkWriter* time_chunk_writer = nullptr;
+        if (RET_FAIL(do_check_schema_table(device_id, tablet, 
time_chunk_writer,
+                                           value_chunk_writers))) {
+            return ret;
+        }
 
-            // Keep writers' seal-check behavior consistent across calls.
-            time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
-            for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                if (!IS_NULL(value_chunk_writers[c])) {
-                    value_chunk_writers[c]->set_enable_page_seal_if_full(
-                        strict_page_size);
-                }
+        std::vector<uint32_t> field_columns;
+        field_columns.reserve(tablet.get_column_count());
+        for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+            if (tablet.column_categories_[col] ==
+                common::ColumnCategory::FIELD) {
+                field_columns.push_back(col);
             }
+        }
+        ASSERT(field_columns.size() == value_chunk_writers.size());
 
-            if (strict_page_size) {
-                // Strict: row-based insertion and force aligned page sealing
-                // when either time or any value page becomes full.
-                for (int i = start_idx; i < end_idx; i++) {
-                    int32_t time_pages_before =
-                        time_chunk_writer->num_of_pages();
-                    std::vector<int32_t> value_pages_before(
-                        value_chunk_writers.size(), 0);
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        if (!IS_NULL(value_chunk_writers[k])) {
-                            value_pages_before[k] =
-                                value_chunk_writers[k]->num_of_pages();
-                        }
-                    }
-
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[i]))) {
-                        return ret;
-                    }
-
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        ValueChunkWriter* value_chunk_writer =
-                            value_chunk_writers[k];
-                        if (IS_NULL(value_chunk_writer)) {
-                            continue;
-                        }
-                        const uint32_t tablet_col_idx = field_columns[k];
-                        if (RET_FAIL(value_write_column(value_chunk_writer,
-                                                        tablet, tablet_col_idx,
-                                                        i, i + 1))) {
-                            return ret;
-                        }
-                    }
-
-                    if (RET_FAIL(maybe_seal_aligned_pages_together(
-                            time_chunk_writer, value_chunk_writers,
-                            time_pages_before, value_pages_before))) {
-                        return ret;
-                    }
-                }
-            } else if (!has_varlen_field_column) {
-                // Optimization: no string/blob/text columns, so we can
-                // segment by point-number and seal pages at those boundaries
-                // in column-based order.
-                const uint32_t points_per_page =
-                    common::g_config_value_.page_writer_max_point_num_;
-
-                time_chunk_writer->set_enable_page_seal_if_full(false);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
-
-                // Fill the already-unsealed time page first.
-                uint32_t time_cur_points = 
time_chunk_writer->get_point_numer();
-                if (time_cur_points >= points_per_page &&
-                    time_chunk_writer->has_current_page_data()) {
-                    if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                        return ret;
-                    }
-                    for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                        if (!IS_NULL(value_chunk_writers[c]) &&
-                            value_chunk_writers[c]->has_current_page_data()) {
-                            if (RET_FAIL(value_chunk_writers[c]
-                                             ->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                    }
-                    time_cur_points = 0;
-                }
-
-                const uint32_t first_seg_len =
-                    (time_cur_points > 0 && time_cur_points < points_per_page)
-                        ? (points_per_page - time_cur_points)
-                        : points_per_page;
-
-                // 1) Write time in segments (seal all full segments).
-                uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                uint32_t seg_len = first_seg_len;
-                while (static_cast<int>(seg_start) < end_idx) {
-                    const uint32_t seg_end = std::min(
-                        seg_start + seg_len, static_cast<uint32_t>(end_idx));
-                    if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
-                                                   seg_start, seg_end))) {
-                        return ret;
-                    }
-                    seg_start = seg_end;
-                    if (static_cast<int>(seg_start) < end_idx) {
-                        if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                            return ret;
-                        }
-                    }
-                    seg_len = points_per_page;
-                }
-
-                // 2) Write each value column (same segments).
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    seg_start = static_cast<uint32_t>(start_idx);
-                    seg_len = first_seg_len;
-                    while (static_cast<int>(seg_start) < end_idx) {
-                        const uint32_t seg_end =
-                            std::min(seg_start + seg_len,
-                                     static_cast<uint32_t>(end_idx));
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, seg_end))) {
-                            return ret;
-                        }
-                        seg_start = seg_end;
-                        if (static_cast<int>(seg_start) < end_idx) {
-                            if (value_chunk_writer->has_current_page_data() &&
-                                RET_FAIL(
-                                    value_chunk_writer->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                        seg_len = points_per_page;
-                    }
-                }
-            } else {
-                // General non-strict (may have varlen STRING/TEXT/BLOB
-                // columns): time auto-seals to provide aligned page 
boundaries;
-                // value writers skip auto page sealing and are sealed manually
-                // at recorded time-page boundaries. Attention: since 
value-side
-                // auto-seal is disabled, if a varlen value page hits the 
memory
-                // threshold earlier, it may not seal immediately and will be
-                // sealed later at the time-page boundaries (non-strict
-                // sacrifices the strict page size/memory limit for
-                // performance).
-                time_chunk_writer->set_enable_page_seal_if_full(true);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
+        // Precompute page boundaries from point counts — no serial write
+        // needed.  The first segment may be shorter if the time page already
+        // holds data from a previous write_table call.
+        const uint32_t page_max_points = std::max<uint32_t>(
+            1, common::g_config_value_.page_writer_max_point_num_);
+        const uint32_t si = static_cast<uint32_t>(start_idx);
+        const uint32_t ei = static_cast<uint32_t>(end_idx);
 
-                std::vector<uint32_t> time_page_row_ends;
-                const uint32_t page_max_points = std::max<uint32_t>(
-                    1, common::g_config_value_.page_writer_max_point_num_);
-                const uint32_t batch_rows =
-                    static_cast<uint32_t>(end_idx - start_idx);
-                time_page_row_ends.reserve(batch_rows / page_max_points + 1);
-                for (uint32_t r = static_cast<uint32_t>(start_idx);
-                     r < static_cast<uint32_t>(end_idx); r++) {
-                    const int32_t pages_before =
-                        time_chunk_writer->num_of_pages();
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[r]))) {
-                        return ret;
-                    }
-                    const int32_t pages_after =
-                        time_chunk_writer->num_of_pages();
-                    if (pages_after > pages_before) {
-                        const uint32_t boundary_end = r + 1;
-                        if (time_page_row_ends.empty() ||
-                            time_page_row_ends.back() != boundary_end) {
-                            time_page_row_ends.push_back(boundary_end);
-                        }
-                    }
+        uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+        const uint32_t first_seg_cap =
+            (time_cur_points > 0 && time_cur_points < page_max_points)
+                ? (page_max_points - time_cur_points)
+                : page_max_points;
+

Review Comment:
   `first_seg_cap` is derived from `time_chunk_writer->get_point_numer()`, but 
there is no handling for the case where the existing unsealed time page is 
already full (by points or memory). Previously `write_table` explicitly sealed 
a full leftover page before continuing. With auto-seal disabled later, writing 
into an already-full page can produce oversized pages or unexpected behavior. 
Consider sealing the current page(s) when the existing page meets the 
configured thresholds before starting segmented writes.



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -1142,274 +1142,138 @@ int TsFileWriter::write_table(Tablet& tablet) {
         auto device_id = device_id_end_index_pair.first;
         int end_idx = device_id_end_index_pair.second;
         if (end_idx == 0) continue;
-        if (table_aligned_) {
-            SimpleVector<ValueChunkWriter*> value_chunk_writers;
-            TimeChunkWriter* time_chunk_writer = nullptr;
-            if (RET_FAIL(do_check_schema_table(device_id, tablet,
-                                               time_chunk_writer,
-                                               value_chunk_writers))) {
-                return ret;
-            }
-
-            const bool strict_page_size =
-                common::g_config_value_.strict_page_size_;
 
-            std::vector<uint32_t> field_columns;
-            field_columns.reserve(tablet.get_column_count());
-            for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
-                if (tablet.column_categories_[col] ==
-                    common::ColumnCategory::FIELD) {
-                    field_columns.push_back(col);
-                }
-            }
-            ASSERT(field_columns.size() == value_chunk_writers.size());
-
-            const bool has_varlen_field_column = [&]() {
-                for (uint32_t i = 0; i < field_columns.size(); i++) {
-                    const common::TSDataType t =
-                        tablet.schema_vec_->at(field_columns[i]).data_type_;
-                    if (t == common::STRING || t == common::TEXT ||
-                        t == common::BLOB) {
-                        return true;
-                    }
-                }
-                return false;
-            }();
+        SimpleVector<ValueChunkWriter*> value_chunk_writers;
+        TimeChunkWriter* time_chunk_writer = nullptr;
+        if (RET_FAIL(do_check_schema_table(device_id, tablet, 
time_chunk_writer,
+                                           value_chunk_writers))) {
+            return ret;
+        }
 
-            // Keep writers' seal-check behavior consistent across calls.
-            time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
-            for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                if (!IS_NULL(value_chunk_writers[c])) {
-                    value_chunk_writers[c]->set_enable_page_seal_if_full(
-                        strict_page_size);
-                }
+        std::vector<uint32_t> field_columns;
+        field_columns.reserve(tablet.get_column_count());
+        for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+            if (tablet.column_categories_[col] ==
+                common::ColumnCategory::FIELD) {
+                field_columns.push_back(col);
             }
+        }
+        ASSERT(field_columns.size() == value_chunk_writers.size());
 
-            if (strict_page_size) {
-                // Strict: row-based insertion and force aligned page sealing
-                // when either time or any value page becomes full.
-                for (int i = start_idx; i < end_idx; i++) {
-                    int32_t time_pages_before =
-                        time_chunk_writer->num_of_pages();
-                    std::vector<int32_t> value_pages_before(
-                        value_chunk_writers.size(), 0);
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        if (!IS_NULL(value_chunk_writers[k])) {
-                            value_pages_before[k] =
-                                value_chunk_writers[k]->num_of_pages();
-                        }
-                    }
-
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[i]))) {
-                        return ret;
-                    }
-
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        ValueChunkWriter* value_chunk_writer =
-                            value_chunk_writers[k];
-                        if (IS_NULL(value_chunk_writer)) {
-                            continue;
-                        }
-                        const uint32_t tablet_col_idx = field_columns[k];
-                        if (RET_FAIL(value_write_column(value_chunk_writer,
-                                                        tablet, tablet_col_idx,
-                                                        i, i + 1))) {
-                            return ret;
-                        }
-                    }
-
-                    if (RET_FAIL(maybe_seal_aligned_pages_together(
-                            time_chunk_writer, value_chunk_writers,
-                            time_pages_before, value_pages_before))) {
-                        return ret;
-                    }
-                }
-            } else if (!has_varlen_field_column) {
-                // Optimization: no string/blob/text columns, so we can
-                // segment by point-number and seal pages at those boundaries
-                // in column-based order.
-                const uint32_t points_per_page =
-                    common::g_config_value_.page_writer_max_point_num_;
-
-                time_chunk_writer->set_enable_page_seal_if_full(false);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
-
-                // Fill the already-unsealed time page first.
-                uint32_t time_cur_points = 
time_chunk_writer->get_point_numer();
-                if (time_cur_points >= points_per_page &&
-                    time_chunk_writer->has_current_page_data()) {
-                    if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                        return ret;
-                    }
-                    for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                        if (!IS_NULL(value_chunk_writers[c]) &&
-                            value_chunk_writers[c]->has_current_page_data()) {
-                            if (RET_FAIL(value_chunk_writers[c]
-                                             ->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                    }
-                    time_cur_points = 0;
-                }
-
-                const uint32_t first_seg_len =
-                    (time_cur_points > 0 && time_cur_points < points_per_page)
-                        ? (points_per_page - time_cur_points)
-                        : points_per_page;
-
-                // 1) Write time in segments (seal all full segments).
-                uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                uint32_t seg_len = first_seg_len;
-                while (static_cast<int>(seg_start) < end_idx) {
-                    const uint32_t seg_end = std::min(
-                        seg_start + seg_len, static_cast<uint32_t>(end_idx));
-                    if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
-                                                   seg_start, seg_end))) {
-                        return ret;
-                    }
-                    seg_start = seg_end;
-                    if (static_cast<int>(seg_start) < end_idx) {
-                        if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                            return ret;
-                        }
-                    }
-                    seg_len = points_per_page;
-                }
-
-                // 2) Write each value column (same segments).
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    seg_start = static_cast<uint32_t>(start_idx);
-                    seg_len = first_seg_len;
-                    while (static_cast<int>(seg_start) < end_idx) {
-                        const uint32_t seg_end =
-                            std::min(seg_start + seg_len,
-                                     static_cast<uint32_t>(end_idx));
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, seg_end))) {
-                            return ret;
-                        }
-                        seg_start = seg_end;
-                        if (static_cast<int>(seg_start) < end_idx) {
-                            if (value_chunk_writer->has_current_page_data() &&
-                                RET_FAIL(
-                                    value_chunk_writer->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                        seg_len = points_per_page;
-                    }
-                }
-            } else {
-                // General non-strict (may have varlen STRING/TEXT/BLOB
-                // columns): time auto-seals to provide aligned page 
boundaries;
-                // value writers skip auto page sealing and are sealed manually
-                // at recorded time-page boundaries. Attention: since 
value-side
-                // auto-seal is disabled, if a varlen value page hits the 
memory
-                // threshold earlier, it may not seal immediately and will be
-                // sealed later at the time-page boundaries (non-strict
-                // sacrifices the strict page size/memory limit for
-                // performance).
-                time_chunk_writer->set_enable_page_seal_if_full(true);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
+        // Precompute page boundaries from point counts — no serial write
+        // needed.  The first segment may be shorter if the time page already
+        // holds data from a previous write_table call.
+        const uint32_t page_max_points = std::max<uint32_t>(
+            1, common::g_config_value_.page_writer_max_point_num_);
+        const uint32_t si = static_cast<uint32_t>(start_idx);
+        const uint32_t ei = static_cast<uint32_t>(end_idx);
 
-                std::vector<uint32_t> time_page_row_ends;
-                const uint32_t page_max_points = std::max<uint32_t>(
-                    1, common::g_config_value_.page_writer_max_point_num_);
-                const uint32_t batch_rows =
-                    static_cast<uint32_t>(end_idx - start_idx);
-                time_page_row_ends.reserve(batch_rows / page_max_points + 1);
-                for (uint32_t r = static_cast<uint32_t>(start_idx);
-                     r < static_cast<uint32_t>(end_idx); r++) {
-                    const int32_t pages_before =
-                        time_chunk_writer->num_of_pages();
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[r]))) {
-                        return ret;
-                    }
-                    const int32_t pages_after =
-                        time_chunk_writer->num_of_pages();
-                    if (pages_after > pages_before) {
-                        const uint32_t boundary_end = r + 1;
-                        if (time_page_row_ends.empty() ||
-                            time_page_row_ends.back() != boundary_end) {
-                            time_page_row_ends.push_back(boundary_end);
-                        }
-                    }
+        uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+        const uint32_t first_seg_cap =
+            (time_cur_points > 0 && time_cur_points < page_max_points)
+                ? (page_max_points - time_cur_points)
+                : page_max_points;
+
+        std::vector<uint32_t> page_boundaries;  // row indices where a page
+                                                // should seal
+        {
+            uint32_t pos = si;
+            uint32_t seg_cap = first_seg_cap;
+            while (pos < ei) {
+                uint32_t seg_end = std::min(pos + seg_cap, ei);
+                if (seg_end < ei) {
+                    page_boundaries.push_back(seg_end);
                 }
+                pos = seg_end;
+                seg_cap = page_max_points;
+            }
+        }
 
-                // Write values column-by-column and seal at recorded time
-                // boundaries.
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                    for (uint32_t boundary_end : time_page_row_ends) {
-                        if (boundary_end <= seg_start) {
-                            continue;
-                        }
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, boundary_end))) {
-                            return ret;
-                        }
-                        if (value_chunk_writer->has_current_page_data() &&
-                            RET_FAIL(value_chunk_writer->seal_current_page())) 
{
-                            return ret;
-                        }
-                        seg_start = boundary_end;
-                    }
-                    if (seg_start < static_cast<uint32_t>(end_idx)) {
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, static_cast<uint32_t>(end_idx)))) {
-                            return ret;
-                        }
-                    }
-                }
+        // Write one column in segments defined by page_boundaries, sealing
+        // at each boundary.  Works for both time and value columns.
+        // We control page sealing explicitly at precomputed boundaries, so
+        // auto-seal must be disabled — otherwise a segment of exactly
+        // page_max_points would trigger auto-seal AND our explicit seal,
+        // double-sealing (sealing an empty page → crash).
+        auto write_time_in_segments = [this, &tablet, &page_boundaries, si,
+                                       ei](TimeChunkWriter* tcw) -> int {
+            int r = E_OK;
+            tcw->set_enable_page_seal_if_full(false);
+            uint32_t seg_start = si;
+            for (uint32_t boundary : page_boundaries) {
+                if ((r = time_write_column(tcw, tablet, seg_start, boundary)) 
!=
+                    E_OK)
+                    return r;
+                if ((r = tcw->seal_current_page()) != E_OK) return r;
+                seg_start = boundary;
             }
-            start_idx = end_idx;
-        } else {
-            MeasurementNamesFromTablet mnames_getter(tablet);
-            SimpleVector<ChunkWriter*> chunk_writers;
-            SimpleVector<common::TSDataType> data_types;
-            if (RET_FAIL(do_check_schema(device_id, mnames_getter,
-                                         chunk_writers, data_types))) {
+            if (seg_start < ei) {
+                r = time_write_column(tcw, tablet, seg_start, ei);
+            }
+            tcw->set_enable_page_seal_if_full(true);
+            return r;

Review Comment:
   `write_time_in_segments` / `write_value_in_segments` unconditionally restore 
`set_enable_page_seal_if_full(true)` at the end. That changes writer behavior 
when `g_config_value_.strict_page_size_` is false (and may also override any 
previous state), making sealing behavior inconsistent across calls. Restore the 
flag to the intended configuration value (or the prior state) instead of 
forcing `true`.



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -1142,274 +1142,138 @@ int TsFileWriter::write_table(Tablet& tablet) {
         auto device_id = device_id_end_index_pair.first;
         int end_idx = device_id_end_index_pair.second;
         if (end_idx == 0) continue;
-        if (table_aligned_) {
-            SimpleVector<ValueChunkWriter*> value_chunk_writers;
-            TimeChunkWriter* time_chunk_writer = nullptr;
-            if (RET_FAIL(do_check_schema_table(device_id, tablet,
-                                               time_chunk_writer,
-                                               value_chunk_writers))) {
-                return ret;
-            }
-
-            const bool strict_page_size =
-                common::g_config_value_.strict_page_size_;
 
-            std::vector<uint32_t> field_columns;
-            field_columns.reserve(tablet.get_column_count());
-            for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
-                if (tablet.column_categories_[col] ==
-                    common::ColumnCategory::FIELD) {
-                    field_columns.push_back(col);
-                }
-            }
-            ASSERT(field_columns.size() == value_chunk_writers.size());
-
-            const bool has_varlen_field_column = [&]() {
-                for (uint32_t i = 0; i < field_columns.size(); i++) {
-                    const common::TSDataType t =
-                        tablet.schema_vec_->at(field_columns[i]).data_type_;
-                    if (t == common::STRING || t == common::TEXT ||
-                        t == common::BLOB) {
-                        return true;
-                    }
-                }
-                return false;
-            }();
+        SimpleVector<ValueChunkWriter*> value_chunk_writers;
+        TimeChunkWriter* time_chunk_writer = nullptr;
+        if (RET_FAIL(do_check_schema_table(device_id, tablet, 
time_chunk_writer,
+                                           value_chunk_writers))) {
+            return ret;
+        }
 
-            // Keep writers' seal-check behavior consistent across calls.
-            time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
-            for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                if (!IS_NULL(value_chunk_writers[c])) {
-                    value_chunk_writers[c]->set_enable_page_seal_if_full(
-                        strict_page_size);
-                }
+        std::vector<uint32_t> field_columns;
+        field_columns.reserve(tablet.get_column_count());
+        for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+            if (tablet.column_categories_[col] ==
+                common::ColumnCategory::FIELD) {
+                field_columns.push_back(col);
             }
+        }
+        ASSERT(field_columns.size() == value_chunk_writers.size());
 
-            if (strict_page_size) {
-                // Strict: row-based insertion and force aligned page sealing
-                // when either time or any value page becomes full.
-                for (int i = start_idx; i < end_idx; i++) {
-                    int32_t time_pages_before =
-                        time_chunk_writer->num_of_pages();
-                    std::vector<int32_t> value_pages_before(
-                        value_chunk_writers.size(), 0);
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        if (!IS_NULL(value_chunk_writers[k])) {
-                            value_pages_before[k] =
-                                value_chunk_writers[k]->num_of_pages();
-                        }
-                    }
-
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[i]))) {
-                        return ret;
-                    }
-
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        ValueChunkWriter* value_chunk_writer =
-                            value_chunk_writers[k];
-                        if (IS_NULL(value_chunk_writer)) {
-                            continue;
-                        }
-                        const uint32_t tablet_col_idx = field_columns[k];
-                        if (RET_FAIL(value_write_column(value_chunk_writer,
-                                                        tablet, tablet_col_idx,
-                                                        i, i + 1))) {
-                            return ret;
-                        }
-                    }
-
-                    if (RET_FAIL(maybe_seal_aligned_pages_together(
-                            time_chunk_writer, value_chunk_writers,
-                            time_pages_before, value_pages_before))) {
-                        return ret;
-                    }
-                }
-            } else if (!has_varlen_field_column) {
-                // Optimization: no string/blob/text columns, so we can
-                // segment by point-number and seal pages at those boundaries
-                // in column-based order.
-                const uint32_t points_per_page =
-                    common::g_config_value_.page_writer_max_point_num_;
-
-                time_chunk_writer->set_enable_page_seal_if_full(false);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
-
-                // Fill the already-unsealed time page first.
-                uint32_t time_cur_points = 
time_chunk_writer->get_point_numer();
-                if (time_cur_points >= points_per_page &&
-                    time_chunk_writer->has_current_page_data()) {
-                    if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                        return ret;
-                    }
-                    for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                        if (!IS_NULL(value_chunk_writers[c]) &&
-                            value_chunk_writers[c]->has_current_page_data()) {
-                            if (RET_FAIL(value_chunk_writers[c]
-                                             ->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                    }
-                    time_cur_points = 0;
-                }
-
-                const uint32_t first_seg_len =
-                    (time_cur_points > 0 && time_cur_points < points_per_page)
-                        ? (points_per_page - time_cur_points)
-                        : points_per_page;
-
-                // 1) Write time in segments (seal all full segments).
-                uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                uint32_t seg_len = first_seg_len;
-                while (static_cast<int>(seg_start) < end_idx) {
-                    const uint32_t seg_end = std::min(
-                        seg_start + seg_len, static_cast<uint32_t>(end_idx));
-                    if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
-                                                   seg_start, seg_end))) {
-                        return ret;
-                    }
-                    seg_start = seg_end;
-                    if (static_cast<int>(seg_start) < end_idx) {
-                        if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                            return ret;
-                        }
-                    }
-                    seg_len = points_per_page;
-                }
-
-                // 2) Write each value column (same segments).
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    seg_start = static_cast<uint32_t>(start_idx);
-                    seg_len = first_seg_len;
-                    while (static_cast<int>(seg_start) < end_idx) {
-                        const uint32_t seg_end =
-                            std::min(seg_start + seg_len,
-                                     static_cast<uint32_t>(end_idx));
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, seg_end))) {
-                            return ret;
-                        }
-                        seg_start = seg_end;
-                        if (static_cast<int>(seg_start) < end_idx) {
-                            if (value_chunk_writer->has_current_page_data() &&
-                                RET_FAIL(
-                                    value_chunk_writer->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                        seg_len = points_per_page;
-                    }
-                }
-            } else {
-                // General non-strict (may have varlen STRING/TEXT/BLOB
-                // columns): time auto-seals to provide aligned page 
boundaries;
-                // value writers skip auto page sealing and are sealed manually
-                // at recorded time-page boundaries. Attention: since 
value-side
-                // auto-seal is disabled, if a varlen value page hits the 
memory
-                // threshold earlier, it may not seal immediately and will be
-                // sealed later at the time-page boundaries (non-strict
-                // sacrifices the strict page size/memory limit for
-                // performance).
-                time_chunk_writer->set_enable_page_seal_if_full(true);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
+        // Precompute page boundaries from point counts — no serial write
+        // needed.  The first segment may be shorter if the time page already
+        // holds data from a previous write_table call.
+        const uint32_t page_max_points = std::max<uint32_t>(
+            1, common::g_config_value_.page_writer_max_point_num_);
+        const uint32_t si = static_cast<uint32_t>(start_idx);
+        const uint32_t ei = static_cast<uint32_t>(end_idx);
 
-                std::vector<uint32_t> time_page_row_ends;
-                const uint32_t page_max_points = std::max<uint32_t>(
-                    1, common::g_config_value_.page_writer_max_point_num_);
-                const uint32_t batch_rows =
-                    static_cast<uint32_t>(end_idx - start_idx);
-                time_page_row_ends.reserve(batch_rows / page_max_points + 1);
-                for (uint32_t r = static_cast<uint32_t>(start_idx);
-                     r < static_cast<uint32_t>(end_idx); r++) {
-                    const int32_t pages_before =
-                        time_chunk_writer->num_of_pages();
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[r]))) {
-                        return ret;
-                    }
-                    const int32_t pages_after =
-                        time_chunk_writer->num_of_pages();
-                    if (pages_after > pages_before) {
-                        const uint32_t boundary_end = r + 1;
-                        if (time_page_row_ends.empty() ||
-                            time_page_row_ends.back() != boundary_end) {
-                            time_page_row_ends.push_back(boundary_end);
-                        }
-                    }
+        uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+        const uint32_t first_seg_cap =
+            (time_cur_points > 0 && time_cur_points < page_max_points)
+                ? (page_max_points - time_cur_points)
+                : page_max_points;
+
+        std::vector<uint32_t> page_boundaries;  // row indices where a page
+                                                // should seal
+        {
+            uint32_t pos = si;
+            uint32_t seg_cap = first_seg_cap;
+            while (pos < ei) {
+                uint32_t seg_end = std::min(pos + seg_cap, ei);
+                if (seg_end < ei) {
+                    page_boundaries.push_back(seg_end);
                 }
+                pos = seg_end;
+                seg_cap = page_max_points;
+            }
+        }
 
-                // Write values column-by-column and seal at recorded time
-                // boundaries.
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                    for (uint32_t boundary_end : time_page_row_ends) {
-                        if (boundary_end <= seg_start) {
-                            continue;
-                        }
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, boundary_end))) {
-                            return ret;
-                        }
-                        if (value_chunk_writer->has_current_page_data() &&
-                            RET_FAIL(value_chunk_writer->seal_current_page())) 
{
-                            return ret;
-                        }
-                        seg_start = boundary_end;
-                    }
-                    if (seg_start < static_cast<uint32_t>(end_idx)) {
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, static_cast<uint32_t>(end_idx)))) {
-                            return ret;
-                        }
-                    }
-                }
+        // Write one column in segments defined by page_boundaries, sealing
+        // at each boundary.  Works for both time and value columns.
+        // We control page sealing explicitly at precomputed boundaries, so
+        // auto-seal must be disabled — otherwise a segment of exactly
+        // page_max_points would trigger auto-seal AND our explicit seal,
+        // double-sealing (sealing an empty page → crash).
+        auto write_time_in_segments = [this, &tablet, &page_boundaries, si,
+                                       ei](TimeChunkWriter* tcw) -> int {
+            int r = E_OK;
+            tcw->set_enable_page_seal_if_full(false);
+            uint32_t seg_start = si;

Review Comment:
   This new segmented write path disables `set_enable_page_seal_if_full` and 
seals only at precomputed *point-count* boundaries. That bypasses the existing 
page-full logic which also enforces `page_writer_max_memory_bytes_` (see 
`TimeChunkWriter::is_cur_page_full()` / 
`ValueChunkWriter::is_cur_page_full()`). For varlen columns (STRING/TEXT/BLOB) 
or when strict sizing is expected, pages can grow past the memory threshold 
because auto-seal is off. Consider keeping the previous strict/varlen behavior 
(or at least ensuring memory-threshold-based sealing remains effective) before 
parallelizing.



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -1142,274 +1142,138 @@ int TsFileWriter::write_table(Tablet& tablet) {
         auto device_id = device_id_end_index_pair.first;
         int end_idx = device_id_end_index_pair.second;
         if (end_idx == 0) continue;
-        if (table_aligned_) {
-            SimpleVector<ValueChunkWriter*> value_chunk_writers;
-            TimeChunkWriter* time_chunk_writer = nullptr;
-            if (RET_FAIL(do_check_schema_table(device_id, tablet,
-                                               time_chunk_writer,
-                                               value_chunk_writers))) {
-                return ret;
-            }
-
-            const bool strict_page_size =
-                common::g_config_value_.strict_page_size_;
 
-            std::vector<uint32_t> field_columns;
-            field_columns.reserve(tablet.get_column_count());
-            for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
-                if (tablet.column_categories_[col] ==
-                    common::ColumnCategory::FIELD) {
-                    field_columns.push_back(col);
-                }
-            }
-            ASSERT(field_columns.size() == value_chunk_writers.size());
-
-            const bool has_varlen_field_column = [&]() {
-                for (uint32_t i = 0; i < field_columns.size(); i++) {
-                    const common::TSDataType t =
-                        tablet.schema_vec_->at(field_columns[i]).data_type_;
-                    if (t == common::STRING || t == common::TEXT ||
-                        t == common::BLOB) {
-                        return true;
-                    }
-                }
-                return false;
-            }();
+        SimpleVector<ValueChunkWriter*> value_chunk_writers;
+        TimeChunkWriter* time_chunk_writer = nullptr;
+        if (RET_FAIL(do_check_schema_table(device_id, tablet, 
time_chunk_writer,
+                                           value_chunk_writers))) {
+            return ret;
+        }
 
-            // Keep writers' seal-check behavior consistent across calls.
-            time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
-            for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                if (!IS_NULL(value_chunk_writers[c])) {
-                    value_chunk_writers[c]->set_enable_page_seal_if_full(
-                        strict_page_size);
-                }
+        std::vector<uint32_t> field_columns;
+        field_columns.reserve(tablet.get_column_count());
+        for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+            if (tablet.column_categories_[col] ==
+                common::ColumnCategory::FIELD) {
+                field_columns.push_back(col);
             }
+        }
+        ASSERT(field_columns.size() == value_chunk_writers.size());
 
-            if (strict_page_size) {
-                // Strict: row-based insertion and force aligned page sealing
-                // when either time or any value page becomes full.
-                for (int i = start_idx; i < end_idx; i++) {
-                    int32_t time_pages_before =
-                        time_chunk_writer->num_of_pages();
-                    std::vector<int32_t> value_pages_before(
-                        value_chunk_writers.size(), 0);
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        if (!IS_NULL(value_chunk_writers[k])) {
-                            value_pages_before[k] =
-                                value_chunk_writers[k]->num_of_pages();
-                        }
-                    }
-
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[i]))) {
-                        return ret;
-                    }
-
-                    for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                        ValueChunkWriter* value_chunk_writer =
-                            value_chunk_writers[k];
-                        if (IS_NULL(value_chunk_writer)) {
-                            continue;
-                        }
-                        const uint32_t tablet_col_idx = field_columns[k];
-                        if (RET_FAIL(value_write_column(value_chunk_writer,
-                                                        tablet, tablet_col_idx,
-                                                        i, i + 1))) {
-                            return ret;
-                        }
-                    }
-
-                    if (RET_FAIL(maybe_seal_aligned_pages_together(
-                            time_chunk_writer, value_chunk_writers,
-                            time_pages_before, value_pages_before))) {
-                        return ret;
-                    }
-                }
-            } else if (!has_varlen_field_column) {
-                // Optimization: no string/blob/text columns, so we can
-                // segment by point-number and seal pages at those boundaries
-                // in column-based order.
-                const uint32_t points_per_page =
-                    common::g_config_value_.page_writer_max_point_num_;
-
-                time_chunk_writer->set_enable_page_seal_if_full(false);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
-
-                // Fill the already-unsealed time page first.
-                uint32_t time_cur_points = 
time_chunk_writer->get_point_numer();
-                if (time_cur_points >= points_per_page &&
-                    time_chunk_writer->has_current_page_data()) {
-                    if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                        return ret;
-                    }
-                    for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                        if (!IS_NULL(value_chunk_writers[c]) &&
-                            value_chunk_writers[c]->has_current_page_data()) {
-                            if (RET_FAIL(value_chunk_writers[c]
-                                             ->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                    }
-                    time_cur_points = 0;
-                }
-
-                const uint32_t first_seg_len =
-                    (time_cur_points > 0 && time_cur_points < points_per_page)
-                        ? (points_per_page - time_cur_points)
-                        : points_per_page;
-
-                // 1) Write time in segments (seal all full segments).
-                uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                uint32_t seg_len = first_seg_len;
-                while (static_cast<int>(seg_start) < end_idx) {
-                    const uint32_t seg_end = std::min(
-                        seg_start + seg_len, static_cast<uint32_t>(end_idx));
-                    if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
-                                                   seg_start, seg_end))) {
-                        return ret;
-                    }
-                    seg_start = seg_end;
-                    if (static_cast<int>(seg_start) < end_idx) {
-                        if (RET_FAIL(time_chunk_writer->seal_current_page())) {
-                            return ret;
-                        }
-                    }
-                    seg_len = points_per_page;
-                }
-
-                // 2) Write each value column (same segments).
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    seg_start = static_cast<uint32_t>(start_idx);
-                    seg_len = first_seg_len;
-                    while (static_cast<int>(seg_start) < end_idx) {
-                        const uint32_t seg_end =
-                            std::min(seg_start + seg_len,
-                                     static_cast<uint32_t>(end_idx));
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, seg_end))) {
-                            return ret;
-                        }
-                        seg_start = seg_end;
-                        if (static_cast<int>(seg_start) < end_idx) {
-                            if (value_chunk_writer->has_current_page_data() &&
-                                RET_FAIL(
-                                    value_chunk_writer->seal_current_page())) {
-                                return ret;
-                            }
-                        }
-                        seg_len = points_per_page;
-                    }
-                }
-            } else {
-                // General non-strict (may have varlen STRING/TEXT/BLOB
-                // columns): time auto-seals to provide aligned page 
boundaries;
-                // value writers skip auto page sealing and are sealed manually
-                // at recorded time-page boundaries. Attention: since 
value-side
-                // auto-seal is disabled, if a varlen value page hits the 
memory
-                // threshold earlier, it may not seal immediately and will be
-                // sealed later at the time-page boundaries (non-strict
-                // sacrifices the strict page size/memory limit for
-                // performance).
-                time_chunk_writer->set_enable_page_seal_if_full(true);
-                for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
-                    if (!IS_NULL(value_chunk_writers[c])) {
-                        value_chunk_writers[c]->set_enable_page_seal_if_full(
-                            false);
-                    }
-                }
+        // Precompute page boundaries from point counts — no serial write
+        // needed.  The first segment may be shorter if the time page already
+        // holds data from a previous write_table call.
+        const uint32_t page_max_points = std::max<uint32_t>(
+            1, common::g_config_value_.page_writer_max_point_num_);
+        const uint32_t si = static_cast<uint32_t>(start_idx);
+        const uint32_t ei = static_cast<uint32_t>(end_idx);
 
-                std::vector<uint32_t> time_page_row_ends;
-                const uint32_t page_max_points = std::max<uint32_t>(
-                    1, common::g_config_value_.page_writer_max_point_num_);
-                const uint32_t batch_rows =
-                    static_cast<uint32_t>(end_idx - start_idx);
-                time_page_row_ends.reserve(batch_rows / page_max_points + 1);
-                for (uint32_t r = static_cast<uint32_t>(start_idx);
-                     r < static_cast<uint32_t>(end_idx); r++) {
-                    const int32_t pages_before =
-                        time_chunk_writer->num_of_pages();
-                    if (RET_FAIL(
-                            time_chunk_writer->write(tablet.timestamps_[r]))) {
-                        return ret;
-                    }
-                    const int32_t pages_after =
-                        time_chunk_writer->num_of_pages();
-                    if (pages_after > pages_before) {
-                        const uint32_t boundary_end = r + 1;
-                        if (time_page_row_ends.empty() ||
-                            time_page_row_ends.back() != boundary_end) {
-                            time_page_row_ends.push_back(boundary_end);
-                        }
-                    }
+        uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+        const uint32_t first_seg_cap =
+            (time_cur_points > 0 && time_cur_points < page_max_points)
+                ? (page_max_points - time_cur_points)
+                : page_max_points;
+
+        std::vector<uint32_t> page_boundaries;  // row indices where a page
+                                                // should seal
+        {
+            uint32_t pos = si;
+            uint32_t seg_cap = first_seg_cap;
+            while (pos < ei) {
+                uint32_t seg_end = std::min(pos + seg_cap, ei);
+                if (seg_end < ei) {
+                    page_boundaries.push_back(seg_end);
                 }
+                pos = seg_end;
+                seg_cap = page_max_points;
+            }
+        }
 
-                // Write values column-by-column and seal at recorded time
-                // boundaries.
-                for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
-                    ValueChunkWriter* value_chunk_writer =
-                        value_chunk_writers[k];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
-                    }
-                    uint32_t seg_start = static_cast<uint32_t>(start_idx);
-                    for (uint32_t boundary_end : time_page_row_ends) {
-                        if (boundary_end <= seg_start) {
-                            continue;
-                        }
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, boundary_end))) {
-                            return ret;
-                        }
-                        if (value_chunk_writer->has_current_page_data() &&
-                            RET_FAIL(value_chunk_writer->seal_current_page())) 
{
-                            return ret;
-                        }
-                        seg_start = boundary_end;
-                    }
-                    if (seg_start < static_cast<uint32_t>(end_idx)) {
-                        if (RET_FAIL(value_write_column(
-                                value_chunk_writer, tablet, field_columns[k],
-                                seg_start, static_cast<uint32_t>(end_idx)))) {
-                            return ret;
-                        }
-                    }
-                }
+        // Write one column in segments defined by page_boundaries, sealing
+        // at each boundary.  Works for both time and value columns.
+        // We control page sealing explicitly at precomputed boundaries, so
+        // auto-seal must be disabled — otherwise a segment of exactly
+        // page_max_points would trigger auto-seal AND our explicit seal,
+        // double-sealing (sealing an empty page → crash).
+        auto write_time_in_segments = [this, &tablet, &page_boundaries, si,
+                                       ei](TimeChunkWriter* tcw) -> int {
+            int r = E_OK;
+            tcw->set_enable_page_seal_if_full(false);
+            uint32_t seg_start = si;
+            for (uint32_t boundary : page_boundaries) {
+                if ((r = time_write_column(tcw, tablet, seg_start, boundary)) 
!=
+                    E_OK)
+                    return r;
+                if ((r = tcw->seal_current_page()) != E_OK) return r;
+                seg_start = boundary;
             }
-            start_idx = end_idx;
-        } else {
-            MeasurementNamesFromTablet mnames_getter(tablet);
-            SimpleVector<ChunkWriter*> chunk_writers;
-            SimpleVector<common::TSDataType> data_types;
-            if (RET_FAIL(do_check_schema(device_id, mnames_getter,
-                                         chunk_writers, data_types))) {
+            if (seg_start < ei) {
+                r = time_write_column(tcw, tablet, seg_start, ei);
+            }
+            tcw->set_enable_page_seal_if_full(true);
+            return r;
+        };
+
+        auto write_value_in_segments = [this, &tablet, &page_boundaries, si,
+                                        ei](ValueChunkWriter* vcw,
+                                            uint32_t col_idx) -> int {
+            int r = E_OK;
+            vcw->set_enable_page_seal_if_full(false);
+            uint32_t seg_start = si;
+            for (uint32_t boundary : page_boundaries) {
+                if ((r = value_write_column(vcw, tablet, col_idx, seg_start,
+                                            boundary)) != E_OK)
+                    return r;
+                if (vcw->has_current_page_data() &&
+                    (r = vcw->seal_current_page()) != E_OK)
+                    return r;
+                seg_start = boundary;
+            }
+            if (seg_start < ei) {
+                r = value_write_column(vcw, tablet, col_idx, seg_start, ei);
+            }
+            vcw->set_enable_page_seal_if_full(true);
+            return r;

Review Comment:
   `write_value_in_segments` restores `vcw->set_enable_page_seal_if_full(true)` 
unconditionally. This can override the intended `strict_page_size_` setting and 
makes the writer state depend on whether `write_table()` ran. Restore to the 
configured value (or the previous state) instead of forcing `true`.



##########
cpp/CMakeLists.txt:
##########
@@ -150,6 +150,13 @@ if (ENABLE_ZLIB)
     add_definitions(-DENABLE_GZIP)
 endif()
 
+option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" 
ON)
+message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}")
+
+if (ENABLE_THREADS)
+    add_definitions(-DENABLE_THREADS)

Review Comment:
   `ENABLE_THREADS` adds a compile definition, but the build does not link 
against the platform thread library (e.g., `-pthread` / `Threads::Threads`). On 
many toolchains this will cause unresolved symbols when using `std::thread`. 
When `ENABLE_THREADS` is ON, `find_package(Threads REQUIRED)` and link 
`Threads::Threads` to the relevant targets (at least `tsfile` and any object 
libs that end up in it).
   ```suggestion
       add_definitions(-DENABLE_THREADS)
       find_package(Threads REQUIRED)
       link_libraries(Threads::Threads)
   ```



##########
cpp/src/writer/tsfile_writer.h:
##########
@@ -194,7 +198,10 @@ class TsFileWriter {
     int64_t record_count_for_next_mem_check_;
     bool write_file_created_;
     bool io_writer_owned_;  // false when init(RestorableTsFileIOWriter*)
-    bool table_aligned_ = true;
+#ifdef ENABLE_THREADS
+    common::ThreadPool thread_pool_{
+        (size_t)common::g_config_value_.write_thread_count_};

Review Comment:
   `thread_pool_` is constructed using the current value of 
`common::g_config_value_.write_thread_count_` at member-initialization time. 
This makes the pool size effectively fixed per `TsFileWriter` instance and can 
also be problematic if the global config hasn't been initialized yet (or is 
accidentally 0), leading to a pool with 0 workers and tasks that never run. 
Consider clamping to >=1 and initializing the pool in the constructor after 
`libtsfile_init()`/config init (or lazily on first parallel use), and clarify 
how runtime changes to `write_thread_count_` are intended to take effect.
   ```suggestion
       static size_t get_thread_pool_size() {
           const int thread_count = common::g_config_value_.write_thread_count_;
           return thread_count > 0 ? static_cast<size_t>(thread_count) : 
size_t{1};
       }
   
       common::ThreadPool thread_pool_{get_thread_pool_size()};
   ```



##########
cpp/src/common/global.h:
##########
@@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() {
     return static_cast<uint8_t>(g_config_value_.default_compression_type_);
 }
 
+FORCE_INLINE void set_parallel_write_enabled(bool enabled) {
+    g_config_value_.parallel_write_enabled_ = enabled;
+}
+
+FORCE_INLINE bool get_parallel_write_enabled() {
+    return g_config_value_.parallel_write_enabled_;
+}
+
+FORCE_INLINE int set_write_thread_count(int32_t count) {
+    if (count < 1 || count > 64) return E_INVALID_ARG;
+    g_config_value_.write_thread_count_ = count;
+    return E_OK;
+}

Review Comment:
   `set_write_thread_count()` updates the global config, but `TsFileWriter`'s 
`thread_pool_` is constructed once and never resized. As written, calling this 
at runtime will not affect existing writers and may mislead API consumers. 
Either document that it must be called before constructing writers, or adjust 
the writer/pool to honor runtime updates.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to