jt2594838 commented on code in PR #749:
URL: https://github.com/apache/tsfile/pull/749#discussion_r2980410263


##########
cpp/src/common/tablet.cc:
##########
@@ -85,8 +85,9 @@ int Tablet::init() {
             case BLOB:
             case TEXT:
             case STRING: {
-                value_matrix_[c].string_data =
-                    (common::String*)malloc(sizeof(String) * max_row_num_);
+                auto* sc = new StringColumn();
+                sc->init(max_row_num_, max_row_num_ * 32);
+                value_matrix_[c].string_col = sc;

Review Comment:
   Where does the 32 come from?



##########
cpp/src/common/allocator/byte_stream.h:
##########
@@ -292,6 +294,7 @@ class ByteStream {
         wrapped_page_.buf_ = (uint8_t*)buf;
 
         page_size_ = buf_len;
+        page_mask_ = buf_len - 1;

Review Comment:
   Is it ensured that page_size_ must be the power of 2?



##########
cpp/src/common/tablet.cc:
##########
@@ -163,6 +166,96 @@ int Tablet::add_timestamp(uint32_t row_index, int64_t 
timestamp) {
     return E_OK;
 }
 
+int Tablet::set_timestamps(const int64_t* timestamps, uint32_t count) {
+    if (err_code_ != E_OK) return err_code_;
+    ASSERT(timestamps_ != NULL);
+    if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) {
+        return E_OUT_OF_RANGE;
+    }
+    std::memcpy(timestamps_, timestamps, count * sizeof(int64_t));
+    cur_row_size_ = std::max(count, cur_row_size_);
+    return E_OK;
+}
+
+int Tablet::set_column_values(uint32_t schema_index, const void* data,
+                              const uint8_t* bitmap, uint32_t count) {
+    if (err_code_ != E_OK) return err_code_;
+    if (UNLIKELY(schema_index >= schema_vec_->size())) return E_OUT_OF_RANGE;
+    if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_)))
+        return E_OUT_OF_RANGE;
+

Review Comment:
   Duplicated with https://github.com/apache/tsfile/pull/747
   
   Refer to the comments in that PR and discuss with @hongzhi-gao to decide 
where to put the changes.



##########
cpp/src/common/tablet.cc:
##########
@@ -348,6 +440,57 @@ void Tablet::set_column_categories(
     }
 }
 
+void Tablet::reset_string_columns() {
+    size_t schema_count = schema_vec_->size();
+    for (size_t c = 0; c < schema_count; c++) {
+        const MeasurementSchema& schema = schema_vec_->at(c);
+        if (schema.data_type_ == STRING || schema.data_type_ == TEXT ||
+            schema.data_type_ == BLOB) {
+            value_matrix_[c].string_col->reset();
+        }
+    }
+}
+
+std::vector<uint32_t> Tablet::find_all_device_boundaries() const {
+    const uint32_t row_count = get_cur_row_size();
+    if (row_count <= 1) return {};
+
+    // Use uint64_t bitmap instead of vector<bool> for faster set/test/scan.
+    const uint32_t nwords = (row_count + 63) / 64;
+    std::vector<uint64_t> boundary(nwords, 0);
+
+    for (auto col_idx : id_column_indexes_) {
+        const StringColumn& sc = *value_matrix_[col_idx].string_col;
+        const uint32_t* off = sc.offsets;
+        const char* buf = sc.buffer;
+        for (uint32_t i = 1; i < row_count; i++) {
+            if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
+            uint32_t len_a = off[i] - off[i - 1];
+            uint32_t len_b = off[i + 1] - off[i];
+            if (len_a != len_b ||
+                (len_a > 0 &&
+                 memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) {
+                boundary[i >> 6] |= (1ULL << (i & 63));
+            }
+        }
+    }

Review Comment:
   id_column_indexes_ -> tag_column_indexes.
   
   May scan the indexes in a reversed order, because in general, the later tag 
is more likely to change often.
   E.g., devices are more likely to have orders like:
   beijing.haidian.wf0001.wt0001
   beijing.haidian.wf0001.wt0002
   beijing.haidian.wf0001.wt0003
   beijing.haidian.wf0002.wt0001
   beijing.haidian.wf0002.wt0002
   beijing.haidian.wf0002.wt0003
   
   And, may break fast when the number of boundaries set equals row_count. 
   
   Will we benefit from it if we maintain the boundaries during insertions of 
tag columns?



##########
cpp/src/common/tablet.cc:
##########
@@ -348,6 +440,57 @@ void Tablet::set_column_categories(
     }
 }
 
+void Tablet::reset_string_columns() {
+    size_t schema_count = schema_vec_->size();
+    for (size_t c = 0; c < schema_count; c++) {
+        const MeasurementSchema& schema = schema_vec_->at(c);
+        if (schema.data_type_ == STRING || schema.data_type_ == TEXT ||
+            schema.data_type_ == BLOB) {
+            value_matrix_[c].string_col->reset();
+        }
+    }
+}
+
+std::vector<uint32_t> Tablet::find_all_device_boundaries() const {
+    const uint32_t row_count = get_cur_row_size();
+    if (row_count <= 1) return {};
+
+    // Use uint64_t bitmap instead of vector<bool> for faster set/test/scan.
+    const uint32_t nwords = (row_count + 63) / 64;
+    std::vector<uint64_t> boundary(nwords, 0);
+
+    for (auto col_idx : id_column_indexes_) {
+        const StringColumn& sc = *value_matrix_[col_idx].string_col;
+        const uint32_t* off = sc.offsets;
+        const char* buf = sc.buffer;
+        for (uint32_t i = 1; i < row_count; i++) {
+            if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
+            uint32_t len_a = off[i] - off[i - 1];
+            uint32_t len_b = off[i + 1] - off[i];
+            if (len_a != len_b ||
+                (len_a > 0 &&
+                 memcmp(buf + off[i - 1], buf + off[i], len_a) != 0)) {
+                boundary[i >> 6] |= (1ULL << (i & 63));
+            }
+        }
+    }
+
+    // Collect boundary positions using bitscan
+    std::vector<uint32_t> result;
+    for (uint32_t w = 0; w < nwords; w++) {
+        uint64_t bits = boundary[w];
+        while (bits) {
+            uint32_t bit = __builtin_ctzll(bits);
+            uint32_t idx = w * 64 + bit;
+            if (idx > 0 && idx < row_count) {
+                result.push_back(idx);
+            }
+            bits &= bits - 1;  // clear lowest set bit
+        }
+    }
+    return result;

Review Comment:
   Add some comments, better to give some examples.



##########
cpp/src/common/tablet.h:
##########
@@ -46,14 +46,78 @@ class TabletColIterator;
  * with their associated metadata such as column names and types.
  */
 class Tablet {
+    // Arrow-style string column: offsets + contiguous buffer.
+    // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
+    struct StringColumn {
+        uint32_t* offsets;      // length: max_rows + 1
+        char* buffer;           // contiguous string data
+        uint32_t buf_capacity;  // allocated buffer size
+        uint32_t buf_used;      // bytes written so far
+
+        StringColumn()
+            : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) 
{}
+
+        void init(uint32_t max_rows, uint32_t init_buf_capacity) {
+            offsets = (uint32_t*)common::mem_alloc(
+                sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT);
+            offsets[0] = 0;
+            buf_capacity = init_buf_capacity;
+            buffer =
+                (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT);
+            buf_used = 0;
+        }
+
+        void destroy() {
+            if (offsets) common::mem_free(offsets);
+            offsets = nullptr;
+            if (buffer) common::mem_free(buffer);
+            buffer = nullptr;
+            buf_capacity = buf_used = 0;
+        }
+
+        void reset() {
+            buf_used = 0;
+            if (offsets) offsets[0] = 0;

Review Comment:
   memset?



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -888,28 +890,72 @@ int TsFileWriter::write_table(Tablet& tablet) {
                                                value_chunk_writers))) {
                 return ret;
             }
-            for (int i = start_idx; i < end_idx; i++) {
-                if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) 
{
-                    return ret;
-                }
-            }
+
+            // Collect column tasks for parallel execution
+            struct ColTask {
+                ValueChunkWriter* writer;
+                uint32_t col_idx;
+            };
+            std::vector<ColTask> tasks;

Review Comment:
   This may not be desirable when resources are limited.
   Better to add a compilation option or configuration for this.



##########
cpp/src/encoding/plain_encoder.h:
##########
@@ -64,6 +71,101 @@ class PlainEncoder : public Encoder {
     }
 
     int get_max_byte_size() { return 0; }
+
+    // Optimized batch encoding: directly byte-swap into ByteStream page 
buffer.
+    // Avoids per-value write_buf overhead entirely — only calls acquire_buf()
+    // once per page boundary crossing.
+    int encode_batch(const int64_t* values, uint32_t count,
+                     common::ByteStream& out_stream) override {
+        if (count == 0) return common::E_OK;
+        uint32_t offset = 0;
+        while (offset < count) {
+            common::ByteStream::Buffer buf = out_stream.acquire_buf();
+            if (UNLIKELY(buf.buf_ == nullptr)) return common::E_OOM;
+            // How many int64 values fit in the remaining page space?
+            uint32_t capacity = buf.len_ / 8;
+            if (capacity == 0) {
+                // Page has < 8 bytes left, fall back to write_buf for this one
+                return Encoder::encode_batch(values + offset, count - offset,
+                                             out_stream);
+            }
+            uint32_t batch = std::min(count - offset, capacity);
+            uint8_t* dst = (uint8_t*)buf.buf_;
+            const int64_t* src = values + offset;
+            uint32_t i = 0;
+#if TSFILE_HAS_NEON
+            // NEON: byte-reverse 2 x int64 per iteration
+            for (; i + 2 <= batch; i += 2) {
+                uint8x16_t v = vld1q_u8((const uint8_t*)&src[i]);
+                v = vrev64q_u8(v);
+                vst1q_u8(dst, v);
+                dst += 16;
+            }
+#endif
+            // Scalar tail
+            for (; i < batch; i++) {
+                uint64_t v = (uint64_t)src[i];
+                dst[0] = (uint8_t)(v >> 56);
+                dst[1] = (uint8_t)(v >> 48);
+                dst[2] = (uint8_t)(v >> 40);
+                dst[3] = (uint8_t)(v >> 32);
+                dst[4] = (uint8_t)(v >> 24);
+                dst[5] = (uint8_t)(v >> 16);
+                dst[6] = (uint8_t)(v >> 8);
+                dst[7] = (uint8_t)(v);
+                dst += 8;
+            }
+            out_stream.buffer_used(batch * 8);
+            offset += batch;
+        }
+        return common::E_OK;
+    }
+
+    int encode_batch(const double* values, uint32_t count,
+                     common::ByteStream& out_stream) override {
+        return encode_batch(reinterpret_cast<const int64_t*>(values), count,
+                            out_stream);
+    }
+
+    int encode_batch(const float* values, uint32_t count,
+                     common::ByteStream& out_stream) override {

Review Comment:
   Where is the method for int32_t?



##########
cpp/src/writer/tsfile_writer.h:
##########
@@ -187,6 +189,7 @@ class TsFileWriter {
     bool write_file_created_;
     bool io_writer_owned_;  // false when init(RestorableTsFileIOWriter*)
     bool table_aligned_ = true;
+    common::ThreadPool thread_pool_{6};

Review Comment:
   Add an item in the global configuration and use it



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -1183,10 +1271,160 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* 
value_chunk_writer,
 
 int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
                                      int64_t* timestamps,
-                                     common::String* col_values,
+                                     Tablet::StringColumn* string_col,
                                      common::BitMap& col_notnull_bitmap,
                                      uint32_t start_idx, uint32_t end_idx) {
-    DO_VALUE_WRITE_TYPED_COLUMN();
+    int ret = E_OK;
+    for (uint32_t r = start_idx; r < end_idx; r++) {
+        common::String val(string_col->buffer + string_col->offsets[r],
+                           string_col->offsets[r + 1] - 
string_col->offsets[r]);
+        if (LIKELY(col_notnull_bitmap.test(r))) {
+            if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) 
{
+                return ret;
+            }
+        } else {
+            if (RET_FAIL(
+                    value_chunk_writer->write(timestamps[r], val, false))) {
+                return ret;
+            }
+        }
+    }
+    return ret;
+}
+
+int TsFileWriter::time_write_column_batch(TimeChunkWriter* time_chunk_writer,
+                                          const Tablet& tablet,
+                                          uint32_t start_idx,
+                                          uint32_t end_idx) {
+    int64_t* timestamps = tablet.timestamps_;
+    int ret = E_OK;
+    if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) {
+        return E_INVALID_ARG;
+    }
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+    return time_chunk_writer->write_batch(timestamps + start_idx, count);
+}
+
+int TsFileWriter::write_column_batch(ChunkWriter* chunk_writer,
+                                      const Tablet& tablet, int col_idx,
+                                      uint32_t start_idx, uint32_t end_idx) {
+    int ret = E_OK;
+    common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
+    int64_t* timestamps = tablet.timestamps_;
+    Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
+    BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx];
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+
+    // For non-aligned write, bitmap bit=0 means not null.
+    // We need to check if there are any nulls.
+    bool has_null = false;
+    for (uint32_t r = start_idx; r < end_idx; r++) {
+        if (col_notnull_bitmap.test(r)) {
+            has_null = true;

Review Comment:
   col_notnull_bitmap -> col_null_bitmap



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -1183,10 +1271,160 @@ int TsFileWriter::write_typed_column(ValueChunkWriter* 
value_chunk_writer,
 
 int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
                                      int64_t* timestamps,
-                                     common::String* col_values,
+                                     Tablet::StringColumn* string_col,
                                      common::BitMap& col_notnull_bitmap,
                                      uint32_t start_idx, uint32_t end_idx) {
-    DO_VALUE_WRITE_TYPED_COLUMN();
+    int ret = E_OK;
+    for (uint32_t r = start_idx; r < end_idx; r++) {
+        common::String val(string_col->buffer + string_col->offsets[r],
+                           string_col->offsets[r + 1] - 
string_col->offsets[r]);
+        if (LIKELY(col_notnull_bitmap.test(r))) {
+            if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) 
{
+                return ret;
+            }
+        } else {
+            if (RET_FAIL(
+                    value_chunk_writer->write(timestamps[r], val, false))) {
+                return ret;
+            }
+        }
+    }
+    return ret;
+}
+
+int TsFileWriter::time_write_column_batch(TimeChunkWriter* time_chunk_writer,
+                                          const Tablet& tablet,
+                                          uint32_t start_idx,
+                                          uint32_t end_idx) {
+    int64_t* timestamps = tablet.timestamps_;
+    int ret = E_OK;
+    if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) {
+        return E_INVALID_ARG;
+    }
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+    return time_chunk_writer->write_batch(timestamps + start_idx, count);
+}
+
+int TsFileWriter::write_column_batch(ChunkWriter* chunk_writer,
+                                      const Tablet& tablet, int col_idx,
+                                      uint32_t start_idx, uint32_t end_idx) {
+    int ret = E_OK;
+    common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
+    int64_t* timestamps = tablet.timestamps_;
+    Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
+    BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx];
+    end_idx = std::min(end_idx, tablet.max_row_num_);
+    uint32_t count = end_idx - start_idx;
+    if (count == 0) return ret;
+
+    // For non-aligned write, bitmap bit=0 means not null.
+    // We need to check if there are any nulls.
+    bool has_null = false;
+    for (uint32_t r = start_idx; r < end_idx; r++) {
+        if (col_notnull_bitmap.test(r)) {
+            has_null = true;

Review Comment:
   check other places



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -888,28 +890,72 @@ int TsFileWriter::write_table(Tablet& tablet) {
                                                value_chunk_writers))) {
                 return ret;
             }
-            for (int i = start_idx; i < end_idx; i++) {
-                if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) 
{
-                    return ret;
-                }
-            }
+
+            // Collect column tasks for parallel execution
+            struct ColTask {
+                ValueChunkWriter* writer;
+                uint32_t col_idx;
+            };
+            std::vector<ColTask> tasks;
             uint32_t field_col_count = 0;
             for (uint32_t i = 0; i < tablet.get_column_count(); ++i) {
                 if (tablet.column_categories_[i] ==
                     common::ColumnCategory::FIELD) {
-                    ValueChunkWriter* value_chunk_writer =
+                    ValueChunkWriter* vcw =
                         value_chunk_writers[field_col_count];
-                    if (IS_NULL(value_chunk_writer)) {
-                        continue;
+                    if (!IS_NULL(vcw)) {
+                        tasks.push_back({vcw, i});
                     }
+                    field_col_count++;
+                }
+            }
 
-                    if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
-                                                    i, start_idx, end_idx))) {
+            // Parallel encode: time column + all value columns concurrently.
+            // Each ChunkWriter has its own Encoder, Statistic, ByteStream —
+            // zero shared state, no locks needed.
+            const uint32_t si = start_idx;
+            const uint32_t ei = end_idx;
+
+            if (tasks.size() >= 2) {
+                // Launch time column + value columns in parallel via thread 
pool
+                auto time_future = thread_pool_.submit(
+                    [this, time_chunk_writer, &tablet, si, ei]() {
+                        return time_write_column_batch(time_chunk_writer,
+                                                       tablet, si, ei);
+                    });
+
+                std::vector<std::future<int>> val_futures;
+                for (size_t t = 0; t < tasks.size(); t++) {
+                    auto& task = tasks[t];
+                    val_futures.push_back(thread_pool_.submit(
+                        [this, &task, &tablet, si, ei]() {
+                            return value_write_column_batch(
+                                task.writer, tablet, task.col_idx, si, ei);
+                        }));
+                }
+
+                // Wait for all and check errors
+                ret = time_future.get();
+                if (ret != E_OK) return ret;
+                for (auto& f : val_futures) {
+                    int r = f.get();
+                    if (r != E_OK && ret == E_OK) ret = r;
+                }
+                if (ret != E_OK) return ret;
+            } else {

Review Comment:
   May always encode the time column in the local thread to reduce overhead.



##########
cpp/src/common/tablet.h:
##########
@@ -46,14 +46,78 @@ class TabletColIterator;
  * with their associated metadata such as column names and types.
  */
 class Tablet {
+    // Arrow-style string column: offsets + contiguous buffer.
+    // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
+    struct StringColumn {
+        uint32_t* offsets;      // length: max_rows + 1
+        char* buffer;           // contiguous string data
+        uint32_t buf_capacity;  // allocated buffer size
+        uint32_t buf_used;      // bytes written so far
+
+        StringColumn()
+            : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0) 
{}
+
+        void init(uint32_t max_rows, uint32_t init_buf_capacity) {
+            offsets = (uint32_t*)common::mem_alloc(
+                sizeof(uint32_t) * (max_rows + 1), common::MOD_DEFAULT);
+            offsets[0] = 0;
+            buf_capacity = init_buf_capacity;
+            buffer =
+                (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT);
+            buf_used = 0;
+        }

Review Comment:
   Check and return OOM?
   
   MOD_TABLET



##########
cpp/src/writer/tsfile_writer.cc:
##########
@@ -953,14 +1022,22 @@ TsFileWriter::split_tablet_by_device(const Tablet& 
tablet) {
         return result;
     }
 
-    for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) {
-        std::shared_ptr<IDeviceID> cur_device_id(tablet.get_device_id(i));
-        if (*cur_device_id != *last_device_id) {
-            result.emplace_back(std::move(last_device_id), i);
-            last_device_id = std::move(cur_device_id);
-        }
+    const uint32_t row_count = tablet.get_cur_row_size();
+    if (row_count == 0) return result;
+
+    auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id");
+    result.emplace_back(std::move(sentinel), 0);
+
+    auto boundaries = tablet.find_all_device_boundaries();
+
+    uint32_t seg_start = 0;
+    for (uint32_t b : boundaries) {
+        std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start));

Review Comment:
   May add an IDeviceID implementation that directly points to a row in a 
tablet.



##########
cpp/src/common/tablet.cc:
##########
@@ -85,8 +85,9 @@ int Tablet::init() {
             case BLOB:
             case TEXT:
             case STRING: {
-                value_matrix_[c].string_data =
-                    (common::String*)malloc(sizeof(String) * max_row_num_);
+                auto* sc = new StringColumn();
+                sc->init(max_row_num_, max_row_num_ * 32);
+                value_matrix_[c].string_col = sc;

Review Comment:
   Use mem_alloc?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to