This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 87f66d99 Write with parallel (#772)
87f66d99 is described below
commit 87f66d999fc3e220caf8d4546f249dc187bedaf2
Author: Colin Lee <[email protected]>
AuthorDate: Fri Apr 10 18:23:58 2026 +0800
Write with parallel (#772)
* support write parallel.
* fix format.
* fix seg segv.
* fix comment.
* add readme.
* use global write thread pool.
---
cpp/CMakeLists.txt | 9 +
cpp/README-zh.md | 29 ++
cpp/README.md | 27 ++
cpp/src/common/config/config.h | 2 +
cpp/src/common/global.cc | 23 ++
cpp/src/common/global.h | 24 ++
cpp/src/common/tablet.h | 7 +
cpp/src/common/thread_pool.h | 123 +++++++
cpp/src/writer/tsfile_writer.cc | 394 ++++++++-------------
cpp/src/writer/tsfile_writer.h | 2 +-
.../writer/table_view/tsfile_writer_table_test.cc | 47 +--
11 files changed, 415 insertions(+), 272 deletions(-)
diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt
index 9d2015e4..3f1d8bdd 100755
--- a/cpp/CMakeLists.txt
+++ b/cpp/CMakeLists.txt
@@ -150,6 +150,15 @@ if (ENABLE_ZLIB)
add_definitions(-DENABLE_GZIP)
endif()
+option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)"
ON)
+message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}")
+
+if (ENABLE_THREADS)
+ add_definitions(-DENABLE_THREADS)
+ find_package(Threads REQUIRED)
+ link_libraries(Threads::Threads)
+endif()
+
option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF)
message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}")
diff --git a/cpp/README-zh.md b/cpp/README-zh.md
index 2a8c84b7..1ff0372f 100644
--- a/cpp/README-zh.md
+++ b/cpp/README-zh.md
@@ -105,6 +105,35 @@ make
---
+## 并行写入
+
+TsFile C++ 支持基于线程池的列级并行编码,适用于表模型写入路径(`write_table`)。启用后,时间列和所有值列使用预计算的 page
边界并行写入,同时保证各列 page 对齐封盘。
+
+### 编译选项
+
+并行写入通过 `ENABLE_THREADS` CMake 选项控制(默认开启):
+
+```bash
+cmake .. -DENABLE_THREADS=ON # 开启(默认)
+cmake .. -DENABLE_THREADS=OFF # 关闭——编译期剥离所有线程代码
+```
+
+### 运行时配置
+
+```cpp
+#include "common/global.h"
+
+// 运行时开启或关闭并行写入(单核机器自动禁用)
+storage::set_parallel_write_enabled(true);
+
+// 设置工作线程数(必须在创建 TsFileWriter 之前调用)
+storage::set_write_thread_count(4);
+```
+
+默认情况下,当机器 CPU 核数大于 1 时自动启用并行写入,线程数设为硬件核数(上限 64)。
+
+---
+
## 使用 TsFile
你可以在 `./examples/cpp_examples` 目录下的 `demo_read.cpp` 和 `demo_write.cpp`
中查看读写数据的示例。
diff --git a/cpp/README.md b/cpp/README.md
index e328413c..9ee9d7c9 100644
--- a/cpp/README.md
+++ b/cpp/README.md
@@ -75,6 +75,33 @@ cmake .. -DToolChain=ON
make
```
+## Parallel Write
+
+TsFile C++ supports thread pool-based parallel column encoding for the table
write path (`write_table`). When enabled, each column (time and value columns)
is written in parallel using precomputed page boundaries, while maintaining
aligned page sealing across columns.
+
+### Build Options
+
+Parallel write is controlled by the `ENABLE_THREADS` CMake option (ON by
default):
+
+```bash
+cmake .. -DENABLE_THREADS=ON # enable (default)
+cmake .. -DENABLE_THREADS=OFF # disable — all thread code is stripped at
compile time
+```
+
+### Runtime Configuration
+
+```cpp
+#include "common/global.h"
+
+// Enable or disable parallel write at runtime (auto-disabled on single-core
machines)
+storage::set_parallel_write_enabled(true);
+
+// Set the number of worker threads (must be called before creating
TsFileWriter)
+storage::set_write_thread_count(4);
+```
+
+By default, parallel write is enabled when the machine has more than one CPU
core, and the thread count is set to the number of hardware cores (capped at
64).
+
## Use TsFile
You can find examples on how to read and write data in `demo_read.cpp` and
`demo_write.cpp` located under `./examples/cpp_examples`. There are also
examples under `./examples/c_examples`on how to use a C-style API to read and
write data in a C environment. You can run `bash build.sh` under `./examples`
to generate an executable output under `./examples/build`.
\ No newline at end of file
diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h
index 81dad924..e2b2039a 100644
--- a/cpp/src/common/config/config.h
+++ b/cpp/src/common/config/config.h
@@ -46,6 +46,8 @@ typedef struct ConfigValue {
TSEncoding double_encoding_type_;
TSEncoding string_encoding_type_;
CompressionType default_compression_type_;
+ bool parallel_write_enabled_;
+ int32_t write_thread_count_;
// When true, aligned writer enforces page size limit strictly by
// interleaving time/value writes and sealing pages together when any side
// becomes full.
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index 91ecedda..ea4bf128 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -24,11 +24,19 @@
#endif
#include <stdlib.h>
+#include <thread>
+
+#ifdef ENABLE_THREADS
+#include "common/thread_pool.h"
+#endif
#include "utils/injection.h"
namespace common {
ColumnSchema g_time_column_schema;
+#ifdef ENABLE_THREADS
+ThreadPool* g_write_thread_pool_ = nullptr;
+#endif
ConfigValue g_config_value_;
void init_config_value() {
@@ -54,12 +62,18 @@ void init_config_value() {
g_config_value_.int64_encoding_type_ = TS_2DIFF;
g_config_value_.float_encoding_type_ = GORILLA;
g_config_value_.double_encoding_type_ = GORILLA;
+ g_config_value_.string_encoding_type_ = PLAIN;
// Default compression type is LZ4
#ifdef ENABLE_LZ4
g_config_value_.default_compression_type_ = LZ4;
#else
g_config_value_.default_compression_type_ = UNCOMPRESSED;
#endif
+ unsigned int hw_cores = std::thread::hardware_concurrency();
+ if (hw_cores == 0) hw_cores = 1; // fallback if detection fails
+ g_config_value_.parallel_write_enabled_ = (hw_cores > 1);
+ g_config_value_.write_thread_count_ =
+ static_cast<int32_t>(std::min(hw_cores, 64u));
// Enforce aligned page size limits strictly by default.
g_config_value_.strict_page_size_ = true;
}
@@ -129,6 +143,15 @@ int init_common() {
g_time_column_schema.encoding_ = PLAIN;
g_time_column_schema.compression_ = UNCOMPRESSED;
g_time_column_schema.column_name_ = storage::TIME_COLUMN_NAME;
+#ifdef ENABLE_THREADS
+ // (Re)create the global write thread pool with the configured size.
+ delete g_write_thread_pool_;
+ size_t pool_size =
+ g_config_value_.write_thread_count_ > 0
+ ? static_cast<size_t>(g_config_value_.write_thread_count_)
+ : size_t{1};
+ g_write_thread_pool_ = new ThreadPool(pool_size);
+#endif
return ret;
}
diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h
index 7937e920..ba5f4bd4 100644
--- a/cpp/src/common/global.h
+++ b/cpp/src/common/global.h
@@ -163,6 +163,30 @@ FORCE_INLINE uint8_t get_global_compression() {
return static_cast<uint8_t>(g_config_value_.default_compression_type_);
}
+FORCE_INLINE void set_parallel_write_enabled(bool enabled) {
+ g_config_value_.parallel_write_enabled_ = enabled;
+}
+
+FORCE_INLINE bool get_parallel_write_enabled() {
+ return g_config_value_.parallel_write_enabled_ &&
+ g_config_value_.write_thread_count_ > 1;
+}
+
+// Set the number of threads for parallel writes. Must be called before
+// init_common() / libtsfile_init() — the global thread pool is created
+// during initialization and is not resized at runtime.
+FORCE_INLINE int set_write_thread_count(int32_t count) {
+ if (count < 1 || count > 64) return E_INVALID_ARG;
+ g_config_value_.write_thread_count_ = count;
+ return E_OK;
+}
+
+#ifdef ENABLE_THREADS
+class ThreadPool;
+// Global write thread pool, created by init_common().
+extern ThreadPool* g_write_thread_pool_;
+#endif
+
extern int init_common();
extern bool is_timestamp_column_name(const char* time_col_name);
extern void cols_to_json(ByteStream* byte_stream,
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index beedacc0..50750d02 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -46,6 +46,7 @@ class TabletColIterator;
* with their associated metadata such as column names and types.
*/
class Tablet {
+ public:
// Arrow-style string column: offsets + contiguous buffer.
// string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
struct StringColumn {
@@ -235,6 +236,12 @@ class Tablet {
}
size_t get_column_count() const { return schema_vec_->size(); }
uint32_t get_cur_row_size() const { return cur_row_size_; }
+ int64_t get_timestamp(uint32_t row_index) const {
+ return timestamps_[row_index];
+ }
+ bool is_null(uint32_t row_index, uint32_t col_index) const {
+ return bitmaps_[col_index].test(row_index);
+ }
/**
* @brief Adds a timestamp to the specified row.
diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h
new file mode 100644
index 00000000..f82aea03
--- /dev/null
+++ b/cpp/src/common/thread_pool.h
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#ifndef COMMON_THREAD_POOL_H
+#define COMMON_THREAD_POOL_H
+
+#ifdef ENABLE_THREADS
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <thread>
+#include <type_traits>
+#include <vector>
+
+namespace common {
+
+// Unified fixed-size thread pool supporting both fire-and-forget tasks
+// (submit void + wait_all) and future-returning tasks (submit<F>).
+// Used by both write path (column-parallel encoding) and read path
+// (column-parallel decoding).
+class ThreadPool {
+ public:
+ explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) {
+ for (size_t i = 0; i < num_threads; i++) {
+ workers_.emplace_back([this] { worker_loop(); });
+ }
+ }
+
+ ~ThreadPool() {
+ {
+ std::lock_guard<std::mutex> lk(mu_);
+ stop_ = true;
+ }
+ cv_work_.notify_all();
+ for (auto& w : workers_) {
+ if (w.joinable()) w.join();
+ }
+ }
+
+ // Submit a fire-and-forget task (no return value).
+ void submit(std::function<void()> task) {
+ {
+ std::lock_guard<std::mutex> lk(mu_);
+ tasks_.push(std::move(task));
+ active_++;
+ }
+ cv_work_.notify_one();
+ }
+
+ // Submit a task that returns a value via std::future.
+ template <typename F>
+ std::future<typename std::result_of<F()>::type> submit(F&& f) {
+ using RetType = typename std::result_of<F()>::type;
+ auto task =
+
std::make_shared<std::packaged_task<RetType()>>(std::forward<F>(f));
+ std::future<RetType> result = task->get_future();
+ {
+ std::lock_guard<std::mutex> lk(mu_);
+ tasks_.emplace([task]() { (*task)(); });
+ active_++;
+ }
+ cv_work_.notify_one();
+ return result;
+ }
+
+ // Block until all submitted tasks have completed.
+ void wait_all() {
+ std::unique_lock<std::mutex> lk(mu_);
+ cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); });
+ }
+
+ private:
+ void worker_loop() {
+ while (true) {
+ std::function<void()> task;
+ {
+ std::unique_lock<std::mutex> lk(mu_);
+ cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); });
+ if (stop_ && tasks_.empty()) return;
+ task = std::move(tasks_.front());
+ tasks_.pop();
+ }
+ task();
+ {
+ std::lock_guard<std::mutex> lk(mu_);
+ active_--;
+ }
+ cv_done_.notify_one();
+ }
+ }
+
+ std::vector<std::thread> workers_;
+ std::queue<std::function<void()>> tasks_;
+ std::mutex mu_;
+ std::condition_variable cv_work_;
+ std::condition_variable cv_done_;
+ bool stop_;
+ int active_;
+};
+
+} // namespace common
+
+#endif // ENABLE_THREADS
+
+#endif // COMMON_THREAD_POOL_H
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 657fcabc..35520c88 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -23,6 +23,9 @@
#include "chunk_writer.h"
#include "common/config/config.h"
+#ifdef ENABLE_THREADS
+#include "common/thread_pool.h"
+#endif
#include "file/restorable_tsfile_io_writer.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
@@ -50,6 +53,10 @@ int libtsfile_init() {
}
void libtsfile_destroy() {
+#ifdef ENABLE_THREADS
+ delete common::g_write_thread_pool_;
+ common::g_write_thread_pool_ = nullptr;
+#endif
ModStat::get_instance().destroy();
libtsfile::g_s_is_inited = false;
}
@@ -1142,274 +1149,159 @@ int TsFileWriter::write_table(Tablet& tablet) {
auto device_id = device_id_end_index_pair.first;
int end_idx = device_id_end_index_pair.second;
if (end_idx == 0) continue;
- if (table_aligned_) {
- SimpleVector<ValueChunkWriter*> value_chunk_writers;
- TimeChunkWriter* time_chunk_writer = nullptr;
- if (RET_FAIL(do_check_schema_table(device_id, tablet,
- time_chunk_writer,
- value_chunk_writers))) {
- return ret;
- }
- const bool strict_page_size =
- common::g_config_value_.strict_page_size_;
+ SimpleVector<ValueChunkWriter*> value_chunk_writers;
+ TimeChunkWriter* time_chunk_writer = nullptr;
+ if (RET_FAIL(do_check_schema_table(device_id, tablet,
time_chunk_writer,
+ value_chunk_writers))) {
+ return ret;
+ }
- std::vector<uint32_t> field_columns;
- field_columns.reserve(tablet.get_column_count());
- for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
- if (tablet.column_categories_[col] ==
- common::ColumnCategory::FIELD) {
- field_columns.push_back(col);
- }
+ std::vector<uint32_t> field_columns;
+ field_columns.reserve(tablet.get_column_count());
+ for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+ if (tablet.column_categories_[col] ==
+ common::ColumnCategory::FIELD) {
+ field_columns.push_back(col);
}
- ASSERT(field_columns.size() == value_chunk_writers.size());
-
- const bool has_varlen_field_column = [&]() {
- for (uint32_t i = 0; i < field_columns.size(); i++) {
- const common::TSDataType t =
- tablet.schema_vec_->at(field_columns[i]).data_type_;
- if (t == common::STRING || t == common::TEXT ||
- t == common::BLOB) {
- return true;
- }
- }
- return false;
- }();
-
- // Keep writers' seal-check behavior consistent across calls.
- time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
- for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- if (!IS_NULL(value_chunk_writers[c])) {
- value_chunk_writers[c]->set_enable_page_seal_if_full(
- strict_page_size);
+ }
+ ASSERT(field_columns.size() == value_chunk_writers.size());
+
+ // Precompute page boundaries from point counts — no serial write
+ // needed. The first segment may be shorter if the time page already
+ // holds data from a previous write_table call.
+ const uint32_t page_max_points = std::max<uint32_t>(
+ 1, common::g_config_value_.page_writer_max_point_num_);
+ const uint32_t si = static_cast<uint32_t>(start_idx);
+ const uint32_t ei = static_cast<uint32_t>(end_idx);
+
+ // If the current unsealed page is already at or past capacity (from
+ // a previous write_table call), seal it before starting new segments.
+ uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+ if (time_cur_points >= page_max_points) {
+ if (time_chunk_writer->has_current_page_data()) {
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
}
}
-
- if (strict_page_size) {
- // Strict: row-based insertion and force aligned page sealing
- // when either time or any value page becomes full.
- for (int i = start_idx; i < end_idx; i++) {
- int32_t time_pages_before =
- time_chunk_writer->num_of_pages();
- std::vector<int32_t> value_pages_before(
- value_chunk_writers.size(), 0);
- for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
- if (!IS_NULL(value_chunk_writers[k])) {
- value_pages_before[k] =
- value_chunk_writers[k]->num_of_pages();
- }
- }
-
- if (RET_FAIL(
- time_chunk_writer->write(tablet.timestamps_[i]))) {
- return ret;
- }
-
- for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
- ValueChunkWriter* value_chunk_writer =
- value_chunk_writers[k];
- if (IS_NULL(value_chunk_writer)) {
- continue;
- }
- const uint32_t tablet_col_idx = field_columns[k];
- if (RET_FAIL(value_write_column(value_chunk_writer,
- tablet, tablet_col_idx,
- i, i + 1))) {
- return ret;
- }
- }
-
- if (RET_FAIL(maybe_seal_aligned_pages_together(
- time_chunk_writer, value_chunk_writers,
- time_pages_before, value_pages_before))) {
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+ if (!IS_NULL(value_chunk_writers[k]) &&
+ value_chunk_writers[k]->has_current_page_data()) {
+ if (RET_FAIL(value_chunk_writers[k]->seal_current_page()))
{
return ret;
}
}
- } else if (!has_varlen_field_column) {
- // Optimization: no string/blob/text columns, so we can
- // segment by point-number and seal pages at those boundaries
- // in column-based order.
- const uint32_t points_per_page =
- common::g_config_value_.page_writer_max_point_num_;
-
- time_chunk_writer->set_enable_page_seal_if_full(false);
- for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- if (!IS_NULL(value_chunk_writers[c])) {
- value_chunk_writers[c]->set_enable_page_seal_if_full(
- false);
- }
- }
-
- // Fill the already-unsealed time page first.
- uint32_t time_cur_points =
time_chunk_writer->get_point_numer();
- if (time_cur_points >= points_per_page &&
- time_chunk_writer->has_current_page_data()) {
- if (RET_FAIL(time_chunk_writer->seal_current_page())) {
- return ret;
- }
- for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- if (!IS_NULL(value_chunk_writers[c]) &&
- value_chunk_writers[c]->has_current_page_data()) {
- if (RET_FAIL(value_chunk_writers[c]
- ->seal_current_page())) {
- return ret;
- }
- }
- }
- time_cur_points = 0;
- }
-
- const uint32_t first_seg_len =
- (time_cur_points > 0 && time_cur_points < points_per_page)
- ? (points_per_page - time_cur_points)
- : points_per_page;
-
- // 1) Write time in segments (seal all full segments).
- uint32_t seg_start = static_cast<uint32_t>(start_idx);
- uint32_t seg_len = first_seg_len;
- while (static_cast<int>(seg_start) < end_idx) {
- const uint32_t seg_end = std::min(
- seg_start + seg_len, static_cast<uint32_t>(end_idx));
- if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
- seg_start, seg_end))) {
- return ret;
- }
- seg_start = seg_end;
- if (static_cast<int>(seg_start) < end_idx) {
- if (RET_FAIL(time_chunk_writer->seal_current_page())) {
- return ret;
- }
- }
- seg_len = points_per_page;
- }
-
- // 2) Write each value column (same segments).
- for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
- ValueChunkWriter* value_chunk_writer =
- value_chunk_writers[k];
- if (IS_NULL(value_chunk_writer)) {
- continue;
- }
- seg_start = static_cast<uint32_t>(start_idx);
- seg_len = first_seg_len;
- while (static_cast<int>(seg_start) < end_idx) {
- const uint32_t seg_end =
- std::min(seg_start + seg_len,
- static_cast<uint32_t>(end_idx));
- if (RET_FAIL(value_write_column(
- value_chunk_writer, tablet, field_columns[k],
- seg_start, seg_end))) {
- return ret;
- }
- seg_start = seg_end;
- if (static_cast<int>(seg_start) < end_idx) {
- if (value_chunk_writer->has_current_page_data() &&
- RET_FAIL(
- value_chunk_writer->seal_current_page())) {
- return ret;
- }
- }
- seg_len = points_per_page;
- }
- }
- } else {
- // General non-strict (may have varlen STRING/TEXT/BLOB
- // columns): time auto-seals to provide aligned page
boundaries;
- // value writers skip auto page sealing and are sealed manually
- // at recorded time-page boundaries. Attention: since
value-side
- // auto-seal is disabled, if a varlen value page hits the
memory
- // threshold earlier, it may not seal immediately and will be
- // sealed later at the time-page boundaries (non-strict
- // sacrifices the strict page size/memory limit for
- // performance).
- time_chunk_writer->set_enable_page_seal_if_full(true);
- for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- if (!IS_NULL(value_chunk_writers[c])) {
- value_chunk_writers[c]->set_enable_page_seal_if_full(
- false);
- }
- }
-
- std::vector<uint32_t> time_page_row_ends;
- const uint32_t page_max_points = std::max<uint32_t>(
- 1, common::g_config_value_.page_writer_max_point_num_);
- const uint32_t batch_rows =
- static_cast<uint32_t>(end_idx - start_idx);
- time_page_row_ends.reserve(batch_rows / page_max_points + 1);
- for (uint32_t r = static_cast<uint32_t>(start_idx);
- r < static_cast<uint32_t>(end_idx); r++) {
- const int32_t pages_before =
- time_chunk_writer->num_of_pages();
- if (RET_FAIL(
- time_chunk_writer->write(tablet.timestamps_[r]))) {
- return ret;
- }
- const int32_t pages_after =
- time_chunk_writer->num_of_pages();
- if (pages_after > pages_before) {
- const uint32_t boundary_end = r + 1;
- if (time_page_row_ends.empty() ||
- time_page_row_ends.back() != boundary_end) {
- time_page_row_ends.push_back(boundary_end);
- }
- }
+ }
+ time_cur_points = 0;
+ }
+ const uint32_t first_seg_cap =
+ (time_cur_points > 0 && time_cur_points < page_max_points)
+ ? (page_max_points - time_cur_points)
+ : page_max_points;
+
+ std::vector<uint32_t> page_boundaries; // row indices where a page
+ // should seal
+ {
+ uint32_t pos = si;
+ uint32_t seg_cap = first_seg_cap;
+ while (pos < ei) {
+ uint32_t seg_end = std::min(pos + seg_cap, ei);
+ if (seg_end < ei) {
+ page_boundaries.push_back(seg_end);
}
+ pos = seg_end;
+ seg_cap = page_max_points;
+ }
+ }
- // Write values column-by-column and seal at recorded time
- // boundaries.
- for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
- ValueChunkWriter* value_chunk_writer =
- value_chunk_writers[k];
- if (IS_NULL(value_chunk_writer)) {
- continue;
- }
- uint32_t seg_start = static_cast<uint32_t>(start_idx);
- for (uint32_t boundary_end : time_page_row_ends) {
- if (boundary_end <= seg_start) {
- continue;
- }
- if (RET_FAIL(value_write_column(
- value_chunk_writer, tablet, field_columns[k],
- seg_start, boundary_end))) {
- return ret;
- }
- if (value_chunk_writer->has_current_page_data() &&
- RET_FAIL(value_chunk_writer->seal_current_page()))
{
- return ret;
- }
- seg_start = boundary_end;
- }
- if (seg_start < static_cast<uint32_t>(end_idx)) {
- if (RET_FAIL(value_write_column(
- value_chunk_writer, tablet, field_columns[k],
- seg_start, static_cast<uint32_t>(end_idx)))) {
- return ret;
- }
- }
- }
+ // We control page sealing explicitly at precomputed boundaries, so
+ // auto-seal must be disabled during segmented writes — otherwise a
+ // segment of exactly page_max_points would trigger auto-seal AND
+ // our explicit seal, double-sealing (sealing an empty page → crash).
+ // Note: with auto-seal off, the memory-based threshold
+ // (page_writer_max_memory_bytes_) is not enforced within a segment.
+ // For varlen columns (STRING/TEXT/BLOB), individual pages may exceed
+ // the memory limit. Each segment is still bounded by
+ // page_max_points rows, keeping pages within a reasonable size.
+ auto write_time_in_segments = [this, &tablet, &page_boundaries, si,
+ ei](TimeChunkWriter* tcw) -> int {
+ int r = E_OK;
+ tcw->set_enable_page_seal_if_full(false);
+ uint32_t seg_start = si;
+ for (uint32_t boundary : page_boundaries) {
+ if ((r = time_write_column(tcw, tablet, seg_start, boundary))
!=
+ E_OK)
+ return r;
+ if ((r = tcw->seal_current_page()) != E_OK) return r;
+ seg_start = boundary;
}
- start_idx = end_idx;
- } else {
- MeasurementNamesFromTablet mnames_getter(tablet);
- SimpleVector<ChunkWriter*> chunk_writers;
- SimpleVector<common::TSDataType> data_types;
- if (RET_FAIL(do_check_schema(device_id, mnames_getter,
- chunk_writers, data_types))) {
+ if (seg_start < ei) {
+ r = time_write_column(tcw, tablet, seg_start, ei);
+ }
+ tcw->set_enable_page_seal_if_full(true);
+ return r;
+ };
+
+ auto write_value_in_segments = [this, &tablet, &page_boundaries, si,
+ ei](ValueChunkWriter* vcw,
+ uint32_t col_idx) -> int {
+ int r = E_OK;
+ vcw->set_enable_page_seal_if_full(false);
+ uint32_t seg_start = si;
+ for (uint32_t boundary : page_boundaries) {
+ if ((r = value_write_column(vcw, tablet, col_idx, seg_start,
+ boundary)) != E_OK)
+ return r;
+ if (vcw->has_current_page_data() &&
+ (r = vcw->seal_current_page()) != E_OK)
+ return r;
+ seg_start = boundary;
+ }
+ if (seg_start < ei) {
+ r = value_write_column(vcw, tablet, col_idx, seg_start, ei);
+ }
+ vcw->set_enable_page_seal_if_full(true);
+ return r;
+ };
+
+ // All columns (time + values) write the same row segments and seal
+ // at the same boundaries — fully parallel.
+#ifdef ENABLE_THREADS
+ if (g_config_value_.parallel_write_enabled_) {
+ std::vector<std::future<int>> futures;
+ futures.push_back(g_write_thread_pool_->submit(
+ [&write_time_in_segments, time_chunk_writer]() {
+ return write_time_in_segments(time_chunk_writer);
+ }));
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+ ValueChunkWriter* vcw = value_chunk_writers[k];
+ if (IS_NULL(vcw)) continue;
+ uint32_t col_idx = field_columns[k];
+ futures.push_back(g_write_thread_pool_->submit(
+ [&write_value_in_segments, vcw, col_idx]() {
+ return write_value_in_segments(vcw, col_idx);
+ }));
+ }
+ for (auto& f : futures) {
+ int r = f.get();
+ if (r != E_OK && ret == E_OK) ret = r;
+ }
+ if (ret != E_OK) return ret;
+ } else
+#endif
+ {
+ if (RET_FAIL(write_time_in_segments(time_chunk_writer))) {
return ret;
}
- ASSERT(chunk_writers.size() == tablet.get_column_count());
- for (uint32_t c = 0; c < chunk_writers.size(); c++) {
- ChunkWriter* chunk_writer = chunk_writers[c];
- if (IS_NULL(chunk_writer)) {
- continue;
- }
- if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
- device_id_end_index_pair.second))) {
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+ ValueChunkWriter* vcw = value_chunk_writers[k];
+ if (IS_NULL(vcw)) continue;
+ if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) {
return ret;
}
}
- start_idx = device_id_end_index_pair.second;
}
+ start_idx = end_idx;
}
record_count_since_last_flush_ += tablet.cur_row_size_;
// Reset string column buffers so the tablet can be reused for the next
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index 01028e2e..cb1b952e 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -22,6 +22,7 @@
#include <fcntl.h>
#include <climits>
+#include <future>
#include <map>
#include <memory>
#include <string>
@@ -194,7 +195,6 @@ class TsFileWriter {
int64_t record_count_for_next_mem_check_;
bool write_file_created_;
bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*)
- bool table_aligned_ = true;
int write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, bool* col_values,
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index 477f875e..d1f3b92e 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -20,6 +20,7 @@
#include <random>
+#include "common/global.h"
#include "common/record.h"
#include "common/schema.h"
#include "common/tablet.h"
@@ -31,10 +32,11 @@
using namespace storage;
using namespace common;
-class TsFileWriterTableTest : public ::testing::Test {
+class TsFileWriterTableTest : public ::testing::TestWithParam<bool> {
protected:
void SetUp() override {
libtsfile_init();
+ set_parallel_write_enabled(GetParam());
file_name_ = std::string("tsfile_writer_table_test_") +
generate_random_string(10) + std::string(".tsfile");
remove(file_name_.c_str());
@@ -133,7 +135,7 @@ class TsFileWriterTableTest : public ::testing::Test {
}
};
-TEST_F(TsFileWriterTableTest, WriteTableTest) {
+TEST_P(TsFileWriterTableTest, WriteTableTest) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -144,7 +146,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) {
+TEST_P(TsFileWriterTableTest, WithoutTagAndMultiPage) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(1);
@@ -192,7 +194,7 @@ TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriteDisorderTest) {
+TEST_P(TsFileWriterTableTest, WriteDisorderTest) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -242,7 +244,7 @@ TEST_F(TsFileWriterTableTest, WriteDisorderTest) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {
+TEST_P(TsFileWriterTableTest, WriteTableTestMultiFlush) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
&write_file_, table_schema, 2 * 1024);
@@ -255,7 +257,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) {
+TEST_P(TsFileWriterTableTest, WriteNonExistColumnTest) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -283,7 +285,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
+TEST_P(TsFileWriterTableTest, WriteNonExistTableTest) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -295,7 +297,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) {
+TEST_P(TsFileWriterTableTest, WriterWithMemoryThreshold) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
&write_file_, table_schema, 256 * 1024 * 1024);
@@ -305,7 +307,7 @@ TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, EmptyTagWrite) {
+TEST_P(TsFileWriterTableTest, EmptyTagWrite) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(3);
@@ -361,7 +363,7 @@ TEST_F(TsFileWriterTableTest, EmptyTagWrite) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
+TEST_P(TsFileWriterTableTest, WritehDataTypeMisMatch) {
auto table_schema = gen_table_schema(0);
auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
&write_file_, table_schema, 256 * 1024 * 1024);
@@ -412,7 +414,7 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) {
tsfile_table_writer_->close();
}
-TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
+TEST_P(TsFileWriterTableTest, WriteAndReadSimple) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(2);
@@ -467,7 +469,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
+TEST_P(TsFileWriterTableTest, DuplicateColumnName) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
measurement_schemas.resize(3);
@@ -505,7 +507,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
+TEST_P(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
for (int i = 0; i < 3; i++) {
@@ -637,7 +639,7 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
ASSERT_EQ(reader.close(), common::E_OK);
}
-TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
+TEST_P(TsFileWriterTableTest, MultiDeviceMultiFields) {
common::config_set_max_degree_of_index_node(5);
auto table_schema = gen_table_schema(0, 1, 100);
auto tsfile_table_writer_ =
@@ -696,7 +698,7 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
delete table_schema;
}
-TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
+TEST_P(TsFileWriterTableTest, WriteDataWithEmptyField) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
for (int i = 0; i < 3; i++) {
@@ -773,7 +775,7 @@ TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
ASSERT_EQ(reader.close(), common::E_OK);
}
-TEST_F(TsFileWriterTableTest, MultiDatatypes) {
+TEST_P(TsFileWriterTableTest, MultiDatatypes) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
@@ -877,7 +879,7 @@ TEST_F(TsFileWriterTableTest, MultiDatatypes) {
delete[] literal;
}
-TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
+TEST_P(TsFileWriterTableTest, DiffCodecTypes) {
std::vector<MeasurementSchema*> measurement_schemas;
std::vector<ColumnCategory> column_categories;
@@ -985,7 +987,7 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) {
delete[] literal;
}
-TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) {
+TEST_P(TsFileWriterTableTest, EncodingConfigIntegration) {
// 1. Test setting global compression type
ASSERT_EQ(E_OK, set_global_compression(SNAPPY));
@@ -1098,7 +1100,7 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) {
}
#ifdef ENABLE_MEM_STAT
-TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) {
+TEST_P(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) {
TableSchema* table_schema = gen_table_schema(0, 2, 3);
auto tsfile_table_writer =
std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
@@ -1172,4 +1174,9 @@ TEST_F(TsFileWriterTableTest,
DISABLED_MemStatWriteAndVerify) {
delete table_schema;
}
-#endif
\ No newline at end of file
+#endif
+
+INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest,
+ ::testing::Values(false));
+INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest,
+ ::testing::Values(true));
\ No newline at end of file