This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch tablet_refine in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 3322a03c97499034a8364c7d3e42f21fb08da255 Author: ColinLee <[email protected]> AuthorDate: Fri Mar 20 00:36:10 2026 +0800 refine tabet writing. --- cpp/src/common/tablet.cc | 68 ++++++++++++++++++++++++++++++++---- cpp/src/common/tablet.h | 67 ++++++++++++++++++++++++++++++++++- cpp/src/writer/tsfile_writer.cc | 77 ++++++++++++++++++++++++++++++++--------- cpp/src/writer/tsfile_writer.h | 5 +-- 4 files changed, 190 insertions(+), 27 deletions(-) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 10489f67d..ea5faf5fd 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -85,8 +85,9 @@ int Tablet::init() { case BLOB: case TEXT: case STRING: { - value_matrix_[c].string_data = - (common::String*)malloc(sizeof(String) * max_row_num_); + auto* sc = new StringColumn(); + sc->init(max_row_num_, max_row_num_ * 32); + value_matrix_[c].string_col = sc; break; } default: @@ -99,6 +100,7 @@ int Tablet::init() { for (size_t c = 0; c < schema_count; c++) { bitmaps_[c].init(max_row_num_, false); } + return E_OK; } @@ -132,7 +134,8 @@ void Tablet::destroy() { case BLOB: case TEXT: case STRING: - free(value_matrix_[c].string_data); + value_matrix_[c].string_col->destroy(); + delete value_matrix_[c].string_col; break; default: break; @@ -197,8 +200,7 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, return &double_values[row_index]; } case STRING: { - auto string_values = column_values.string_data; - return &string_values[row_index]; + return &column_values.string_col->get_string_view(row_index); } default: return nullptr; @@ -208,8 +210,8 @@ void* Tablet::get_value(int row_index, uint32_t schema_index, template <> void Tablet::process_val(uint32_t row_index, uint32_t schema_index, common::String str) { - value_matrix_[schema_index].string_data[row_index].dup_from(str, - page_arena_); + value_matrix_[schema_index].string_col->append(row_index, str.buf_, + str.len_); bitmaps_[schema_index].clear(row_index); /* mark as non-null */ } @@ -348,6 +350,58 @@ void Tablet::set_column_categories( } } +void Tablet::reset_string_columns() { + size_t schema_count = schema_vec_->size(); + for (size_t c = 0; c < schema_count; c++) { + const MeasurementSchema& schema = schema_vec_->at(c); + if (schema.data_type_ == STRING || schema.data_type_ == TEXT || + schema.data_type_ == BLOB) { + value_matrix_[c].string_col->reset(); + } + } + page_arena_.reset(); +} + +std::vector<uint32_t> Tablet::find_all_device_boundaries() const { + const uint32_t row_count = get_cur_row_size(); + if (row_count <= 1) return {}; + + // Use uint64_t bitmap instead of vector<bool> for faster set/test/scan. + const uint32_t nwords = (row_count + 63) / 64; + std::vector<uint64_t> boundary(nwords, 0); + + for (auto col_idx : id_column_indexes_) { + const StringColumn& sc = *value_matrix_[col_idx].string_col; + const uint32_t* off = sc.offsets; + const char* buf = sc.buffer; + for (uint32_t i = 1; i < row_count; i++) { + if (boundary[i >> 6] & (1ULL << (i & 63))) continue; + uint32_t len_a = off[i] - off[i - 1]; + uint32_t len_b = off[i + 1] - off[i]; + if (len_a != len_b || + (len_a > 0 && + memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) { + boundary[i >> 6] |= (1ULL << (i & 63)); + } + } + } + + // Collect boundary positions using bitscan + std::vector<uint32_t> result; + for (uint32_t w = 0; w < nwords; w++) { + uint64_t bits = boundary[w]; + while (bits) { + uint32_t bit = __builtin_ctzll(bits); + uint32_t idx = w * 64 + bit; + if (idx > 0 && idx < row_count) { + result.push_back(idx); + } + bits &= bits - 1; // clear lowest set bit + } + } + return result; +} + std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const { std::vector<std::string*> id_array; id_array.push_back(new std::string(insert_target_name_)); diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 04fee7643..c1f10b593 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -46,6 +46,68 @@ class TabletColIterator; * with their associated metadata such as column names and types. */ class Tablet { + // Arrow-style string column: offsets + contiguous buffer. + // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i] + struct StringColumn { + uint32_t* offsets; // length: max_rows + 1 + char* buffer; // contiguous string data + uint32_t buf_capacity; // allocated buffer size + uint32_t buf_used; // bytes written so far + + StringColumn() + : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) {} + + void init(uint32_t max_rows, uint32_t init_buf_capacity) { + offsets = (uint32_t*)malloc(sizeof(uint32_t) * (max_rows + 1)); + offsets[0] = 0; + buf_capacity = init_buf_capacity; + buffer = (char*)malloc(buf_capacity); + buf_used = 0; + } + + void destroy() { + free(offsets); + offsets = nullptr; + free(buffer); + buffer = nullptr; + buf_capacity = buf_used = 0; + } + + void reset() { + buf_used = 0; + if (offsets) offsets[0] = 0; + } + + void append(uint32_t row, const char* data, uint32_t len) { + // Grow buffer if needed + while (buf_used + len > buf_capacity) { + buf_capacity = buf_capacity * 2 + len; + buffer = (char*)realloc(buffer, buf_capacity); + } + memcpy(buffer + buf_used, data, len); + offsets[row] = buf_used; + offsets[row + 1] = buf_used + len; + buf_used += len; + } + + const char* get_str(uint32_t row) const { + return buffer + offsets[row]; + } + uint32_t get_len(uint32_t row) const { + return offsets[row + 1] - offsets[row]; + } + // Return a String view for a given row. The returned reference is + // valid until the next call to get_string_view on this column. + common::String& get_string_view(uint32_t row) { + view_cache_.buf_ = buffer + offsets[row]; + view_cache_.len_ = offsets[row + 1] - offsets[row]; + return view_cache_; + } + + private: + common::String view_cache_; + }; + struct ValueMatrixEntry { union { int32_t* int32_data; @@ -53,7 +115,7 @@ class Tablet { float* float_data; double* double_data; bool* bool_data; - common::String* string_data; + StringColumn* string_col; }; }; @@ -201,6 +263,7 @@ class Tablet { void set_column_categories( const std::vector<common::ColumnCategory>& column_categories); std::shared_ptr<IDeviceID> get_device_id(int i) const; + std::vector<uint32_t> find_all_device_boundaries() const; /** * @brief Template function to add a value of type T to the specified row * and column by name. @@ -234,6 +297,8 @@ class Tablet { schema_map_ = schema_map; } + void reset_string_columns(); + friend class TabletColIterator; friend class TsFileWriter; friend struct MeasurementNamesFromTablet; diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 9a087a82f..891fb3cb9 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -934,6 +934,10 @@ int TsFileWriter::write_table(Tablet& tablet) { } } record_count_since_last_flush_ += tablet.cur_row_size_; + // Release string memory accumulated during this write. + // The page_arena_ holds all dup_from'd String buffers which are no longer + // needed after the data has been encoded into chunks. + tablet.reset_string_columns(); ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -941,10 +945,11 @@ int TsFileWriter::write_table(Tablet& tablet) { std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> TsFileWriter::split_tablet_by_device(const Tablet& tablet) { std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> result; - std::shared_ptr<IDeviceID> last_device_id = - std::make_shared<StringArrayDeviceID>("last_device_id"); + if (tablet.id_column_indexes_.empty()) { - result.emplace_back(std::move(last_device_id), 0); + // No ID columns — entire tablet is one device + auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id"); + result.emplace_back(std::move(sentinel), 0); std::vector<std::string*> id_array; id_array.push_back(new std::string(tablet.insert_target_name_)); auto res = std::make_shared<StringArrayDeviceID>(id_array); @@ -953,14 +958,26 @@ TsFileWriter::split_tablet_by_device(const Tablet& tablet) { return result; } - for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) { - std::shared_ptr<IDeviceID> cur_device_id(tablet.get_device_id(i)); - if (*cur_device_id != *last_device_id) { - result.emplace_back(std::move(last_device_id), i); - last_device_id = std::move(cur_device_id); - } + const uint32_t row_count = tablet.get_cur_row_size(); + if (row_count == 0) return result; + + // Sentinel entry (end_idx == 0, will be skipped by caller) + auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id"); + result.emplace_back(std::move(sentinel), 0); + + // Column-oriented scan: find all boundaries, then construct DeviceID + // only once per device segment. + auto boundaries = tablet.find_all_device_boundaries(); + + uint32_t seg_start = 0; + for (uint32_t b : boundaries) { + std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(dev_id), b); + seg_start = b; } - result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size()); + // Last segment + std::shared_ptr<IDeviceID> last_id(tablet.get_device_id(seg_start)); + result.emplace_back(std::move(last_id), row_count); return result; } @@ -996,7 +1013,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet, col_notnull_bitmap, start_idx, end_idx); } else if (data_type == common::STRING) { ret = - write_typed_column(chunk_writer, timestamps, col_values.string_data, + write_typed_column(chunk_writer, timestamps, col_values.string_col, col_notnull_bitmap, start_idx, end_idx); } else { ASSERT(false); @@ -1061,8 +1078,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer, case common::TEXT: case common::BLOB: ret = write_typed_column(value_chunk_writer, timestamps, - (common::String*)col_values.string_data, - col_notnull_bitmap, start_idx, end_idx); + col_values.string_col, col_notnull_bitmap, + start_idx, end_idx); break; default: ret = E_NOT_SUPPORT; @@ -1140,10 +1157,21 @@ int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + if (LIKELY(!col_notnull_bitmap.test(r))) { + common::String val( + string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (RET_FAIL(chunk_writer->write(timestamps[r], val))) { + return ret; + } + } + } + return ret; } int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, @@ -1183,10 +1211,25 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx) { - DO_VALUE_WRITE_TYPED_COLUMN(); + int ret = E_OK; + for (uint32_t r = start_idx; r < end_idx; r++) { + common::String val(string_col->buffer + string_col->offsets[r], + string_col->offsets[r + 1] - string_col->offsets[r]); + if (LIKELY(col_notnull_bitmap.test(r))) { + if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) { + return ret; + } + } else { + if (RET_FAIL( + value_chunk_writer->write(timestamps[r], val, false))) { + return ret; + } + } + } + return ret; } // TODO make sure ret is meaningful to SDK user diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 85c47db7f..5e18ea2a9 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -137,7 +137,7 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps, - common::String* col_values, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); @@ -198,7 +198,8 @@ class TsFileWriter { common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx); int write_typed_column(ValueChunkWriter* value_chunk_writer, - int64_t* timestamps, common::String* col_values, + int64_t* timestamps, + Tablet::StringColumn* string_col, common::BitMap& col_notnull_bitmap, uint32_t start_idx, uint32_t end_idx);
