This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch write_with_parallel in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 38b7336e0563d6526c3322740d1be31800d8e60e Author: ColinLee <[email protected]> AuthorDate: Thu Apr 9 21:03:22 2026 +0800 support write parallel. --- cpp/CMakeLists.txt | 7 + cpp/src/common/config/config.h | 2 + cpp/src/common/global.cc | 3 + cpp/src/common/global.h | 14 + cpp/src/common/tablet.cc | 131 ++++++++ cpp/src/common/tablet.h | 5 + cpp/src/common/thread_pool.h | 122 +++++++ cpp/src/writer/tsfile_writer.cc | 373 +++++++-------------- cpp/src/writer/tsfile_writer.h | 9 +- .../writer/table_view/tsfile_writer_table_test.cc | 45 +-- 10 files changed, 436 insertions(+), 275 deletions(-) diff --git a/cpp/CMakeLists.txt b/cpp/CMakeLists.txt index 9d2015e4..cc50a178 100755 --- a/cpp/CMakeLists.txt +++ b/cpp/CMakeLists.txt @@ -150,6 +150,13 @@ if (ENABLE_ZLIB) add_definitions(-DENABLE_GZIP) endif() +option(ENABLE_THREADS "Enable multi-threaded read/write (requires pthreads)" ON) +message("cmake using: ENABLE_THREADS=${ENABLE_THREADS}") + +if (ENABLE_THREADS) + add_definitions(-DENABLE_THREADS) +endif() + option(ENABLE_SIMDE "Enable SIMDe (SIMD Everywhere)" OFF) message("cmake using: ENABLE_SIMDE=${ENABLE_SIMDE}") diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h index 81dad924..e2b2039a 100644 --- a/cpp/src/common/config/config.h +++ b/cpp/src/common/config/config.h @@ -46,6 +46,8 @@ typedef struct ConfigValue { TSEncoding double_encoding_type_; TSEncoding string_encoding_type_; CompressionType default_compression_type_; + bool parallel_write_enabled_; + int32_t write_thread_count_; // When true, aligned writer enforces page size limit strictly by // interleaving time/value writes and sealing pages together when any side // becomes full. diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc index 91ecedda..721522c7 100644 --- a/cpp/src/common/global.cc +++ b/cpp/src/common/global.cc @@ -54,12 +54,15 @@ void init_config_value() { g_config_value_.int64_encoding_type_ = TS_2DIFF; g_config_value_.float_encoding_type_ = GORILLA; g_config_value_.double_encoding_type_ = GORILLA; + g_config_value_.string_encoding_type_ = PLAIN; // Default compression type is LZ4 #ifdef ENABLE_LZ4 g_config_value_.default_compression_type_ = LZ4; #else g_config_value_.default_compression_type_ = UNCOMPRESSED; #endif + g_config_value_.parallel_write_enabled_ = true; + g_config_value_.write_thread_count_ = 6; // Enforce aligned page size limits strictly by default. g_config_value_.strict_page_size_ = true; } diff --git a/cpp/src/common/global.h b/cpp/src/common/global.h index 7937e920..01be5a9c 100644 --- a/cpp/src/common/global.h +++ b/cpp/src/common/global.h @@ -163,6 +163,20 @@ FORCE_INLINE uint8_t get_global_compression() { return static_cast<uint8_t>(g_config_value_.default_compression_type_); } +FORCE_INLINE void set_parallel_write_enabled(bool enabled) { + g_config_value_.parallel_write_enabled_ = enabled; +} + +FORCE_INLINE bool get_parallel_write_enabled() { + return g_config_value_.parallel_write_enabled_; +} + +FORCE_INLINE int set_write_thread_count(int32_t count) { + if (count < 1 || count > 64) return E_INVALID_ARG; + g_config_value_.write_thread_count_ = count; + return E_OK; +} + extern int init_common(); extern bool is_timestamp_column_name(const char* time_col_name); extern void cols_to_json(ByteStream* byte_stream, diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 4088a692..6233abd7 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -19,7 +19,9 @@ #include "tablet.h" +#include <algorithm> #include <cstdlib> +#include <numeric> #include "allocator/alloc_base.h" #include "datatype/date_converter.h" @@ -491,6 +493,135 @@ void Tablet::set_column_categories( } } +namespace { + +template <typename T> +void permute_array(T* arr, const std::vector<uint32_t>& perm, uint32_t n) { + std::vector<T> tmp(n); + for (uint32_t i = 0; i < n; i++) tmp[i] = arr[perm[i]]; + std::copy(tmp.begin(), tmp.end(), arr); +} + +void permute_string_column(Tablet::StringColumn* sc, BitMap& bm, + const std::vector<uint32_t>& perm, uint32_t n, + uint32_t max_rows) { + Tablet::StringColumn tmp; + tmp.init(max_rows, sc->buf_used > 0 ? sc->buf_used : 64); + for (uint32_t i = 0; i < n; i++) { + uint32_t r = perm[i]; + if (bm.test(r)) { + // Null row — write a zero-length entry to keep offsets valid. + tmp.append(i, "", 0); + } else { + int32_t off = sc->offsets[r]; + uint32_t len = + static_cast<uint32_t>(sc->offsets[r + 1] - off); + tmp.append(i, sc->buffer + off, len); + } + } + // Swap contents. + std::swap(sc->offsets, tmp.offsets); + std::swap(sc->buffer, tmp.buffer); + std::swap(sc->buf_capacity, tmp.buf_capacity); + std::swap(sc->buf_used, tmp.buf_used); + tmp.destroy(); +} + +void permute_bitmap(BitMap& bm, const std::vector<uint32_t>& perm, + uint32_t n) { + if (!bm.get_bitmap()) return; + uint32_t size_bytes = bm.get_size(); + // Save original bits. + std::vector<char> orig(bm.get_bitmap(), bm.get_bitmap() + size_bytes); + // Clear all bits (= all non-null). + bm.clear_all(); + // Re-set null bits through the permutation. + for (uint32_t i = 0; i < n; i++) { + uint32_t src = perm[i]; + if (orig[src >> 3] & (1 << (src & 7))) { + bm.set(i); + } + } +} + +} // anonymous namespace + +void Tablet::sort_by_device() { + if (id_column_indexes_.empty() || cur_row_size_ <= 1) return; + + const uint32_t n = cur_row_size_; + + // Build permutation sorted by tag column values (stable sort keeps + // timestamp order within each device). + std::vector<uint32_t> perm(n); + std::iota(perm.begin(), perm.end(), 0); + + std::stable_sort(perm.begin(), perm.end(), [this](uint32_t a, uint32_t b) { + for (int idx : id_column_indexes_) { + bool a_null = bitmaps_[idx].test(a); + bool b_null = bitmaps_[idx].test(b); + if (a_null != b_null) return a_null > b_null; // null sorts last + if (a_null) continue; // both null — equal on this column + const StringColumn& sc = *value_matrix_[idx].string_col; + int32_t a_off = sc.offsets[a]; + uint32_t a_len = static_cast<uint32_t>(sc.offsets[a + 1] - a_off); + int32_t b_off = sc.offsets[b]; + uint32_t b_len = static_cast<uint32_t>(sc.offsets[b + 1] - b_off); + uint32_t min_len = std::min(a_len, b_len); + int cmp = (min_len > 0) + ? memcmp(sc.buffer + a_off, sc.buffer + b_off, min_len) + : 0; + if (cmp != 0) return cmp < 0; + if (a_len != b_len) return a_len < b_len; + } + return false; + }); + + // Check if already sorted. + bool sorted = true; + for (uint32_t i = 0; i < n && sorted; i++) { + if (perm[i] != i) sorted = false; + } + if (sorted) return; + + // Apply permutation to timestamps. + permute_array(timestamps_, perm, n); + + // Apply permutation to each column. + uint32_t col_count = static_cast<uint32_t>(schema_vec_->size()); + for (uint32_t c = 0; c < col_count; c++) { + TSDataType dt = schema_vec_->at(c).data_type_; + switch (dt) { + case BOOLEAN: + permute_array(value_matrix_[c].bool_data, perm, n); + break; + case INT32: + case DATE: + permute_array(value_matrix_[c].int32_data, perm, n); + break; + case INT64: + case TIMESTAMP: + permute_array(value_matrix_[c].int64_data, perm, n); + break; + case FLOAT: + permute_array(value_matrix_[c].float_data, perm, n); + break; + case DOUBLE: + permute_array(value_matrix_[c].double_data, perm, n); + break; + case STRING: + case TEXT: + case BLOB: + permute_string_column(value_matrix_[c].string_col, bitmaps_[c], + perm, n, max_row_num_); + break; + default: + break; + } + permute_bitmap(bitmaps_[c], perm, n); + } +} + void Tablet::reset_string_columns() { size_t schema_count = schema_vec_->size(); for (size_t c = 0; c < schema_count; c++) { diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index beedacc0..a6bebae9 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,7 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + public: // Arrow-style string column: offsets + contiguous buffer. // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] struct StringColumn { @@ -284,6 +285,10 @@ class Tablet { void set_column_categories( const std::vector<common::ColumnCategory>& column_categories); + // Sort rows so that rows belonging to the same device (same TAG column + // values) are contiguous. Stable sort: preserves timestamp order within + // each device. No-op when there are no TAG columns or ≤1 rows. + void sort_by_device(); std::shared_ptr<IDeviceID> get_device_id(int i) const; std::vector<uint32_t> find_all_device_boundaries() const; diff --git a/cpp/src/common/thread_pool.h b/cpp/src/common/thread_pool.h new file mode 100644 index 00000000..9285d4ff --- /dev/null +++ b/cpp/src/common/thread_pool.h @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#ifndef COMMON_THREAD_POOL_H +#define COMMON_THREAD_POOL_H + +#ifdef ENABLE_THREADS + +#include <condition_variable> +#include <functional> +#include <future> +#include <mutex> +#include <queue> +#include <thread> +#include <vector> + +namespace common { + +// Unified fixed-size thread pool supporting both fire-and-forget tasks +// (submit void + wait_all) and future-returning tasks (submit<F>). +// Used by both write path (column-parallel encoding) and read path +// (column-parallel decoding). +class ThreadPool { + public: + explicit ThreadPool(size_t num_threads) : stop_(false), active_(0) { + for (size_t i = 0; i < num_threads; i++) { + workers_.emplace_back([this] { worker_loop(); }); + } + } + + ~ThreadPool() { + { + std::lock_guard<std::mutex> lk(mu_); + stop_ = true; + } + cv_work_.notify_all(); + for (auto& w : workers_) { + if (w.joinable()) w.join(); + } + } + + // Submit a fire-and-forget task (no return value). + void submit(std::function<void()> task) { + { + std::lock_guard<std::mutex> lk(mu_); + tasks_.push(std::move(task)); + active_++; + } + cv_work_.notify_one(); + } + + // Submit a task that returns a value via std::future. + template <typename F> + std::future<typename std::result_of<F()>::type> submit(F&& f) { + using RetType = typename std::result_of<F()>::type; + auto task = + std::make_shared<std::packaged_task<RetType()>>(std::forward<F>(f)); + std::future<RetType> result = task->get_future(); + { + std::lock_guard<std::mutex> lk(mu_); + tasks_.emplace([task]() { (*task)(); }); + active_++; + } + cv_work_.notify_one(); + return result; + } + + // Block until all submitted tasks have completed. + void wait_all() { + std::unique_lock<std::mutex> lk(mu_); + cv_done_.wait(lk, [this] { return active_ == 0 && tasks_.empty(); }); + } + + private: + void worker_loop() { + while (true) { + std::function<void()> task; + { + std::unique_lock<std::mutex> lk(mu_); + cv_work_.wait(lk, [this] { return stop_ || !tasks_.empty(); }); + if (stop_ && tasks_.empty()) return; + task = std::move(tasks_.front()); + tasks_.pop(); + } + task(); + { + std::lock_guard<std::mutex> lk(mu_); + active_--; + } + cv_done_.notify_one(); + } + } + + std::vector<std::thread> workers_; + std::queue<std::function<void()>> tasks_; + std::mutex mu_; + std::condition_variable cv_work_; + std::condition_variable cv_done_; + bool stop_; + int active_; +}; + +} // namespace common + +#endif // ENABLE_THREADS + +#endif // COMMON_THREAD_POOL_H diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 657fcabc..c69915ba 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -1136,280 +1136,145 @@ int TsFileWriter::write_table(Tablet& tablet) { return ret; } + // Sort tablet so same-device rows are contiguous. + tablet.sort_by_device(); + auto device_id_end_index_pairs = split_tablet_by_device(tablet); int start_idx = 0; for (auto& device_id_end_index_pair : device_id_end_index_pairs) { auto device_id = device_id_end_index_pair.first; int end_idx = device_id_end_index_pair.second; if (end_idx == 0) continue; - if (table_aligned_) { - SimpleVector<ValueChunkWriter*> value_chunk_writers; - TimeChunkWriter* time_chunk_writer = nullptr; - if (RET_FAIL(do_check_schema_table(device_id, tablet, - time_chunk_writer, - value_chunk_writers))) { - return ret; - } - const bool strict_page_size = - common::g_config_value_.strict_page_size_; - - std::vector<uint32_t> field_columns; - field_columns.reserve(tablet.get_column_count()); - for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { - if (tablet.column_categories_[col] == - common::ColumnCategory::FIELD) { - field_columns.push_back(col); - } - } - ASSERT(field_columns.size() == value_chunk_writers.size()); - - const bool has_varlen_field_column = [&]() { - for (uint32_t i = 0; i < field_columns.size(); i++) { - const common::TSDataType t = - tablet.schema_vec_->at(field_columns[i]).data_type_; - if (t == common::STRING || t == common::TEXT || - t == common::BLOB) { - return true; - } - } - return false; - }(); + SimpleVector<ValueChunkWriter*> value_chunk_writers; + TimeChunkWriter* time_chunk_writer = nullptr; + if (RET_FAIL(do_check_schema_table(device_id, tablet, + time_chunk_writer, + value_chunk_writers))) { + return ret; + } - // Keep writers' seal-check behavior consistent across calls. - time_chunk_writer->set_enable_page_seal_if_full(strict_page_size); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - strict_page_size); - } + std::vector<uint32_t> field_columns; + field_columns.reserve(tablet.get_column_count()); + for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { + if (tablet.column_categories_[col] == + common::ColumnCategory::FIELD) { + field_columns.push_back(col); } + } + ASSERT(field_columns.size() == value_chunk_writers.size()); - if (strict_page_size) { - // Strict: row-based insertion and force aligned page sealing - // when either time or any value page becomes full. - for (int i = start_idx; i < end_idx; i++) { - int32_t time_pages_before = - time_chunk_writer->num_of_pages(); - std::vector<int32_t> value_pages_before( - value_chunk_writers.size(), 0); - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - if (!IS_NULL(value_chunk_writers[k])) { - value_pages_before[k] = - value_chunk_writers[k]->num_of_pages(); - } - } - - if (RET_FAIL( - time_chunk_writer->write(tablet.timestamps_[i]))) { - return ret; - } - - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - const uint32_t tablet_col_idx = field_columns[k]; - if (RET_FAIL(value_write_column(value_chunk_writer, - tablet, tablet_col_idx, - i, i + 1))) { - return ret; - } - } - - if (RET_FAIL(maybe_seal_aligned_pages_together( - time_chunk_writer, value_chunk_writers, - time_pages_before, value_pages_before))) { - return ret; - } - } - } else if (!has_varlen_field_column) { - // Optimization: no string/blob/text columns, so we can - // segment by point-number and seal pages at those boundaries - // in column-based order. - const uint32_t points_per_page = - common::g_config_value_.page_writer_max_point_num_; - - time_chunk_writer->set_enable_page_seal_if_full(false); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - false); - } - } - - // Fill the already-unsealed time page first. - uint32_t time_cur_points = time_chunk_writer->get_point_numer(); - if (time_cur_points >= points_per_page && - time_chunk_writer->has_current_page_data()) { - if (RET_FAIL(time_chunk_writer->seal_current_page())) { - return ret; - } - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c]) && - value_chunk_writers[c]->has_current_page_data()) { - if (RET_FAIL(value_chunk_writers[c] - ->seal_current_page())) { - return ret; - } - } - } - time_cur_points = 0; - } - - const uint32_t first_seg_len = - (time_cur_points > 0 && time_cur_points < points_per_page) - ? (points_per_page - time_cur_points) - : points_per_page; - - // 1) Write time in segments (seal all full segments). - uint32_t seg_start = static_cast<uint32_t>(start_idx); - uint32_t seg_len = first_seg_len; - while (static_cast<int>(seg_start) < end_idx) { - const uint32_t seg_end = std::min( - seg_start + seg_len, static_cast<uint32_t>(end_idx)); - if (RET_FAIL(time_write_column(time_chunk_writer, tablet, - seg_start, seg_end))) { - return ret; - } - seg_start = seg_end; - if (static_cast<int>(seg_start) < end_idx) { - if (RET_FAIL(time_chunk_writer->seal_current_page())) { - return ret; - } - } - seg_len = points_per_page; - } - - // 2) Write each value column (same segments). - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - seg_start = static_cast<uint32_t>(start_idx); - seg_len = first_seg_len; - while (static_cast<int>(seg_start) < end_idx) { - const uint32_t seg_end = - std::min(seg_start + seg_len, - static_cast<uint32_t>(end_idx)); - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, seg_end))) { - return ret; - } - seg_start = seg_end; - if (static_cast<int>(seg_start) < end_idx) { - if (value_chunk_writer->has_current_page_data() && - RET_FAIL( - value_chunk_writer->seal_current_page())) { - return ret; - } - } - seg_len = points_per_page; - } - } - } else { - // General non-strict (may have varlen STRING/TEXT/BLOB - // columns): time auto-seals to provide aligned page boundaries; - // value writers skip auto page sealing and are sealed manually - // at recorded time-page boundaries. Attention: since value-side - // auto-seal is disabled, if a varlen value page hits the memory - // threshold earlier, it may not seal immediately and will be - // sealed later at the time-page boundaries (non-strict - // sacrifices the strict page size/memory limit for - // performance). - time_chunk_writer->set_enable_page_seal_if_full(true); - for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { - if (!IS_NULL(value_chunk_writers[c])) { - value_chunk_writers[c]->set_enable_page_seal_if_full( - false); - } - } + // Precompute page boundaries from point counts — no serial write + // needed. The first segment may be shorter if the time page already + // holds data from a previous write_table call. + const uint32_t page_max_points = std::max<uint32_t>( + 1, common::g_config_value_.page_writer_max_point_num_); + const uint32_t si = static_cast<uint32_t>(start_idx); + const uint32_t ei = static_cast<uint32_t>(end_idx); - std::vector<uint32_t> time_page_row_ends; - const uint32_t page_max_points = std::max<uint32_t>( - 1, common::g_config_value_.page_writer_max_point_num_); - const uint32_t batch_rows = - static_cast<uint32_t>(end_idx - start_idx); - time_page_row_ends.reserve(batch_rows / page_max_points + 1); - for (uint32_t r = static_cast<uint32_t>(start_idx); - r < static_cast<uint32_t>(end_idx); r++) { - const int32_t pages_before = - time_chunk_writer->num_of_pages(); - if (RET_FAIL( - time_chunk_writer->write(tablet.timestamps_[r]))) { - return ret; - } - const int32_t pages_after = - time_chunk_writer->num_of_pages(); - if (pages_after > pages_before) { - const uint32_t boundary_end = r + 1; - if (time_page_row_ends.empty() || - time_page_row_ends.back() != boundary_end) { - time_page_row_ends.push_back(boundary_end); - } - } + uint32_t time_cur_points = time_chunk_writer->get_point_numer(); + const uint32_t first_seg_cap = + (time_cur_points > 0 && time_cur_points < page_max_points) + ? (page_max_points - time_cur_points) + : page_max_points; + + std::vector<uint32_t> page_boundaries; // row indices where a page + // should seal + { + uint32_t pos = si; + uint32_t seg_cap = first_seg_cap; + while (pos < ei) { + uint32_t seg_end = std::min(pos + seg_cap, ei); + if (seg_end < ei) { + page_boundaries.push_back(seg_end); } + pos = seg_end; + seg_cap = page_max_points; + } + } - // Write values column-by-column and seal at recorded time - // boundaries. - for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { - ValueChunkWriter* value_chunk_writer = - value_chunk_writers[k]; - if (IS_NULL(value_chunk_writer)) { - continue; - } - uint32_t seg_start = static_cast<uint32_t>(start_idx); - for (uint32_t boundary_end : time_page_row_ends) { - if (boundary_end <= seg_start) { - continue; - } - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, boundary_end))) { - return ret; - } - if (value_chunk_writer->has_current_page_data() && - RET_FAIL(value_chunk_writer->seal_current_page())) { - return ret; - } - seg_start = boundary_end; - } - if (seg_start < static_cast<uint32_t>(end_idx)) { - if (RET_FAIL(value_write_column( - value_chunk_writer, tablet, field_columns[k], - seg_start, static_cast<uint32_t>(end_idx)))) { - return ret; - } - } - } + // Write one column in segments defined by page_boundaries, sealing + // at each boundary. Works for both time and value columns. + auto write_time_in_segments = + [this, &tablet, &page_boundaries, si, ei]( + TimeChunkWriter* tcw) -> int { + int r = E_OK; + tcw->set_enable_page_seal_if_full(false); + uint32_t seg_start = si; + for (uint32_t boundary : page_boundaries) { + if ((r = time_write_column(tcw, tablet, seg_start, + boundary)) != E_OK) + return r; + if ((r = tcw->seal_current_page()) != E_OK) return r; + seg_start = boundary; } - start_idx = end_idx; - } else { - MeasurementNamesFromTablet mnames_getter(tablet); - SimpleVector<ChunkWriter*> chunk_writers; - SimpleVector<common::TSDataType> data_types; - if (RET_FAIL(do_check_schema(device_id, mnames_getter, - chunk_writers, data_types))) { + if (seg_start < ei) { + r = time_write_column(tcw, tablet, seg_start, ei); + } + tcw->set_enable_page_seal_if_full(true); + return r; + }; + + auto write_value_in_segments = + [this, &tablet, &page_boundaries, si, ei]( + ValueChunkWriter* vcw, uint32_t col_idx) -> int { + int r = E_OK; + vcw->set_enable_page_seal_if_full(false); + uint32_t seg_start = si; + for (uint32_t boundary : page_boundaries) { + if ((r = value_write_column(vcw, tablet, col_idx, seg_start, + boundary)) != E_OK) + return r; + if (vcw->has_current_page_data() && + (r = vcw->seal_current_page()) != E_OK) + return r; + seg_start = boundary; + } + if (seg_start < ei) { + r = value_write_column(vcw, tablet, col_idx, seg_start, ei); + } + vcw->set_enable_page_seal_if_full(true); + return r; + }; + + // All columns (time + values) write the same row segments and seal + // at the same boundaries — fully parallel. +#ifdef ENABLE_THREADS + if (g_config_value_.parallel_write_enabled_) { + std::vector<std::future<int>> futures; + futures.push_back(thread_pool_.submit( + [&write_time_in_segments, time_chunk_writer]() { + return write_time_in_segments(time_chunk_writer); + })); + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + ValueChunkWriter* vcw = value_chunk_writers[k]; + if (IS_NULL(vcw)) continue; + uint32_t col_idx = field_columns[k]; + futures.push_back(thread_pool_.submit( + [&write_value_in_segments, vcw, col_idx]() { + return write_value_in_segments(vcw, col_idx); + })); + } + for (auto& f : futures) { + int r = f.get(); + if (r != E_OK && ret == E_OK) ret = r; + } + if (ret != E_OK) return ret; + } else +#endif + { + if (RET_FAIL(write_time_in_segments(time_chunk_writer))) { return ret; } - ASSERT(chunk_writers.size() == tablet.get_column_count()); - for (uint32_t c = 0; c < chunk_writers.size(); c++) { - ChunkWriter* chunk_writer = chunk_writers[c]; - if (IS_NULL(chunk_writer)) { - continue; - } - if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx, - device_id_end_index_pair.second))) { + for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { + ValueChunkWriter* vcw = value_chunk_writers[k]; + if (IS_NULL(vcw)) continue; + if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) { return ret; } } - start_idx = device_id_end_index_pair.second; } + start_idx = end_idx; } record_count_since_last_flush_ += tablet.cur_row_size_; // Reset string column buffers so the tablet can be reused for the next diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 01028e2e..87646bc7 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -22,6 +22,7 @@ #include <fcntl.h> #include <climits> +#include <future> #include <map> #include <memory> #include <string> @@ -32,6 +33,9 @@ #include "common/record.h" #include "common/schema.h" #include "common/tablet.h" +#ifdef ENABLE_THREADS +#include "common/thread_pool.h" +#endif namespace storage { class WriteFile; @@ -194,7 +198,10 @@ class TsFileWriter { int64_t record_count_for_next_mem_check_; bool write_file_created_; bool io_writer_owned_; // false when init(RestorableTsFileIOWriter*) - bool table_aligned_ = true; +#ifdef ENABLE_THREADS + common::ThreadPool thread_pool_{ + (size_t)common::g_config_value_.write_thread_count_}; +#endif int write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, bool* col_values, diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index 477f875e..52f64736 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -26,15 +26,17 @@ #include "file/tsfile_io_writer.h" #include "file/write_file.h" #include "reader/tsfile_reader.h" +#include "common/global.h" #include "writer/chunk_writer.h" #include "writer/tsfile_table_writer.h" using namespace storage; using namespace common; -class TsFileWriterTableTest : public ::testing::Test { +class TsFileWriterTableTest : public ::testing::TestWithParam<bool> { protected: void SetUp() override { libtsfile_init(); + g_config_value_.parallel_write_enabled_ = GetParam(); file_name_ = std::string("tsfile_writer_table_test_") + generate_random_string(10) + std::string(".tsfile"); remove(file_name_.c_str()); @@ -133,7 +135,7 @@ class TsFileWriterTableTest : public ::testing::Test { } }; -TEST_F(TsFileWriterTableTest, WriteTableTest) { +TEST_P(TsFileWriterTableTest, WriteTableTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(&write_file_, table_schema); @@ -144,7 +146,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) { +TEST_P(TsFileWriterTableTest, WithoutTagAndMultiPage) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; measurement_schemas.resize(1); @@ -192,7 +194,7 @@ TEST_F(TsFileWriterTableTest, WithoutTagAndMultiPage) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteDisorderTest) { +TEST_P(TsFileWriterTableTest, WriteDisorderTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(&write_file_, table_schema); @@ -242,7 +244,7 @@ TEST_F(TsFileWriterTableTest, WriteDisorderTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { +TEST_P(TsFileWriterTableTest, WriteTableTestMultiFlush) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>( &write_file_, table_schema, 2 * 1024); @@ -255,7 +257,7 @@ TEST_F(TsFileWriterTableTest, WriteTableTestMultiFlush) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) { +TEST_P(TsFileWriterTableTest, WriteNonExistColumnTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(&write_file_, table_schema); @@ -283,7 +285,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistColumnTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { +TEST_P(TsFileWriterTableTest, WriteNonExistTableTest) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(&write_file_, table_schema); @@ -295,7 +297,7 @@ TEST_F(TsFileWriterTableTest, WriteNonExistTableTest) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { +TEST_P(TsFileWriterTableTest, WriterWithMemoryThreshold) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>( &write_file_, table_schema, 256 * 1024 * 1024); @@ -305,7 +307,7 @@ TEST_F(TsFileWriterTableTest, WriterWithMemoryThreshold) { delete table_schema; } -TEST_F(TsFileWriterTableTest, EmptyTagWrite) { +TEST_P(TsFileWriterTableTest, EmptyTagWrite) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; measurement_schemas.resize(3); @@ -361,7 +363,7 @@ TEST_F(TsFileWriterTableTest, EmptyTagWrite) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { +TEST_P(TsFileWriterTableTest, WritehDataTypeMisMatch) { auto table_schema = gen_table_schema(0); auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>( &write_file_, table_schema, 256 * 1024 * 1024); @@ -412,7 +414,7 @@ TEST_F(TsFileWriterTableTest, WritehDataTypeMisMatch) { tsfile_table_writer_->close(); } -TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { +TEST_P(TsFileWriterTableTest, WriteAndReadSimple) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; measurement_schemas.resize(2); @@ -467,7 +469,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) { delete table_schema; } -TEST_F(TsFileWriterTableTest, DuplicateColumnName) { +TEST_P(TsFileWriterTableTest, DuplicateColumnName) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; measurement_schemas.resize(3); @@ -505,7 +507,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { +TEST_P(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; for (int i = 0; i < 3; i++) { @@ -637,7 +639,7 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) { ASSERT_EQ(reader.close(), common::E_OK); } -TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { +TEST_P(TsFileWriterTableTest, MultiDeviceMultiFields) { common::config_set_max_degree_of_index_node(5); auto table_schema = gen_table_schema(0, 1, 100); auto tsfile_table_writer_ = @@ -696,7 +698,7 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { delete table_schema; } -TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { +TEST_P(TsFileWriterTableTest, WriteDataWithEmptyField) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; for (int i = 0; i < 3; i++) { @@ -773,7 +775,7 @@ TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { ASSERT_EQ(reader.close(), common::E_OK); } -TEST_F(TsFileWriterTableTest, MultiDatatypes) { +TEST_P(TsFileWriterTableTest, MultiDatatypes) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; @@ -877,7 +879,7 @@ TEST_F(TsFileWriterTableTest, MultiDatatypes) { delete[] literal; } -TEST_F(TsFileWriterTableTest, DiffCodecTypes) { +TEST_P(TsFileWriterTableTest, DiffCodecTypes) { std::vector<MeasurementSchema*> measurement_schemas; std::vector<ColumnCategory> column_categories; @@ -985,7 +987,7 @@ TEST_F(TsFileWriterTableTest, DiffCodecTypes) { delete[] literal; } -TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { +TEST_P(TsFileWriterTableTest, EncodingConfigIntegration) { // 1. Test setting global compression type ASSERT_EQ(E_OK, set_global_compression(SNAPPY)); @@ -1098,7 +1100,7 @@ TEST_F(TsFileWriterTableTest, EncodingConfigIntegration) { } #ifdef ENABLE_MEM_STAT -TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { +TEST_P(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { TableSchema* table_schema = gen_table_schema(0, 2, 3); auto tsfile_table_writer = std::make_shared<TsFileTableWriter>(&write_file_, table_schema); @@ -1172,4 +1174,7 @@ TEST_F(TsFileWriterTableTest, DISABLED_MemStatWriteAndVerify) { delete table_schema; } -#endif \ No newline at end of file +#endif + +INSTANTIATE_TEST_SUITE_P(Serial, TsFileWriterTableTest, ::testing::Values(false)); +INSTANTIATE_TEST_SUITE_P(Parallel, TsFileWriterTableTest, ::testing::Values(true)); \ No newline at end of file
