This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 75e0664c Perf: optimize Tablet write with columnar string storage and
lazy DeviceID construction (~10x throughput) (#748)
75e0664c is described below
commit 75e0664c081c1661243ed034d963824b386cdf63
Author: Colin Lee <[email protected]>
AuthorDate: Tue Mar 31 21:28:10 2026 +0800
Perf: optimize Tablet write with columnar string storage and lazy DeviceID
construction (~10x throughput) (#748)
---
cpp/src/common/tablet.cc | 158 +++++++++++++++++++++++---
cpp/src/common/tablet.h | 80 ++++++++++++-
cpp/src/cwrapper/arrow_c.cc | 122 +++++++++++++++-----
cpp/src/writer/tsfile_writer.cc | 73 +++++++++---
cpp/src/writer/tsfile_writer.h | 5 +-
cpp/test/common/tsblock/arrow_tsblock_test.cc | 156 ++++++++++++++++++++++++-
6 files changed, 529 insertions(+), 65 deletions(-)
diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc
index afe62ab1..4088a692 100644
--- a/cpp/src/common/tablet.cc
+++ b/cpp/src/common/tablet.cc
@@ -98,10 +98,15 @@ int Tablet::init() {
case BLOB:
case TEXT:
case STRING: {
- value_matrix_[c].string_data =
- static_cast<common::String*>(common::mem_alloc(
- sizeof(String) * max_row_num_, common::MOD_TABLET));
- if (value_matrix_[c].string_data == nullptr) return E_OOM;
+ auto* sc = static_cast<StringColumn*>(common::mem_alloc(
+ sizeof(StringColumn), common::MOD_TABLET));
+ if (sc == nullptr) return E_OOM;
+ new (sc) StringColumn();
+ // 8 bytes/row is a conservative initial estimate for short
+ // string columns (e.g. device IDs, tags). The buffer grows
+ // automatically on demand via mem_realloc.
+ sc->init(max_row_num_, max_row_num_ * 8);
+ value_matrix_[c].string_col = sc;
break;
}
default:
@@ -150,7 +155,8 @@ void Tablet::destroy() {
case BLOB:
case TEXT:
case STRING:
- common::mem_free(value_matrix_[c].string_data);
+ value_matrix_[c].string_col->destroy();
+ common::mem_free(value_matrix_[c].string_col);
break;
default:
break;
@@ -240,17 +246,51 @@ int Tablet::set_column_values(uint32_t schema_index,
const void* data,
return E_TYPE_NOT_SUPPORTED;
}
+ std::memcpy(dst, data, count * elem_size);
if (bitmap == nullptr) {
- // All valid: bulk copy + mark all as non-null
- std::memcpy(dst, data, count * elem_size);
bitmaps_[schema_index].clear_all();
} else {
- // Bulk copy all data (null positions will have garbage but won't be
- // read).
- std::memcpy(dst, data, count * elem_size);
+ char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
+ uint32_t bm_bytes = (count + 7) / 8;
+ std::memcpy(tsfile_bm, bitmap, bm_bytes);
+ }
+ cur_row_size_ = std::max(count, cur_row_size_);
+ return E_OK;
+}
+
+int Tablet::set_column_string_values(uint32_t schema_index,
+ const int32_t* offsets, const char* data,
+ const uint8_t* bitmap, uint32_t count) {
+ if (err_code_ != E_OK) {
+ return err_code_;
+ }
+ if (UNLIKELY(schema_index >= schema_vec_->size())) {
+ return E_OUT_OF_RANGE;
+ }
+ if (UNLIKELY(count > static_cast<uint32_t>(max_row_num_))) {
+ return E_OUT_OF_RANGE;
+ }
+
+ StringColumn* sc = value_matrix_[schema_index].string_col;
+ if (sc == nullptr) {
+ return E_INVALID_ARG;
+ }
+
+ uint32_t total_bytes = static_cast<uint32_t>(offsets[count]);
+ if (total_bytes > sc->buf_capacity) {
+ sc->buf_capacity = total_bytes;
+ sc->buffer = (char*)mem_realloc(sc->buffer, sc->buf_capacity);
+ }
- // bitmap uses TsFile convention (1=null, 0=valid), same as
- // internal BitMap, so copy directly.
+ if (total_bytes > 0) {
+ std::memcpy(sc->buffer, data, total_bytes);
+ }
+ std::memcpy(sc->offsets, offsets, (count + 1) * sizeof(int32_t));
+ sc->buf_used = total_bytes;
+
+ if (bitmap == nullptr) {
+ bitmaps_[schema_index].clear_all();
+ } else {
char* tsfile_bm = bitmaps_[schema_index].get_bitmap();
uint32_t bm_bytes = (count + 7) / 8;
std::memcpy(tsfile_bm, bitmap, bm_bytes);
@@ -292,9 +332,10 @@ void* Tablet::get_value(int row_index, uint32_t
schema_index,
double* double_values = column_values.double_data;
return &double_values[row_index];
}
+ case TEXT:
+ case BLOB:
case STRING: {
- auto string_values = column_values.string_data;
- return &string_values[row_index];
+ return &column_values.string_col->get_string_view(row_index);
}
default:
return nullptr;
@@ -304,8 +345,8 @@ void* Tablet::get_value(int row_index, uint32_t
schema_index,
template <>
void Tablet::process_val(uint32_t row_index, uint32_t schema_index,
common::String str) {
- value_matrix_[schema_index].string_data[row_index].dup_from(str,
- page_arena_);
+ value_matrix_[schema_index].string_col->append(row_index, str.buf_,
+ str.len_);
bitmaps_[schema_index].clear(row_index); /* mark as non-null */
}
@@ -450,6 +491,91 @@ void Tablet::set_column_categories(
}
}
+void Tablet::reset_string_columns() {
+ size_t schema_count = schema_vec_->size();
+ for (size_t c = 0; c < schema_count; c++) {
+ const MeasurementSchema& schema = schema_vec_->at(c);
+ if (schema.data_type_ == STRING || schema.data_type_ == TEXT ||
+ schema.data_type_ == BLOB) {
+ value_matrix_[c].string_col->reset();
+ }
+ }
+}
+
+// Find all row indices where the device ID changes. A device ID is the
+// composite key formed by all id columns (e.g. region + sensor_id). Row i
+// is a boundary when at least one id column differs between row i-1 and row i.
+//
+// Example (2 id columns: region, sensor_id):
+// row 0: "A", "s1"
+// row 1: "A", "s2" <- boundary: sensor_id changed
+// row 2: "B", "s1" <- boundary: region changed
+// row 3: "B", "s1"
+// row 4: "B", "s2" <- boundary: sensor_id changed
+// result: [1, 2, 4]
+//
+// Boundaries are computed in one shot at flush time rather than maintained
+// incrementally during add_value / set_column_*. The total work is similar
+// either way, but batch computation here is far more CPU-friendly: the inner
+// loop is a tight memcmp scan over contiguous buffers with good cache
+// locality, and the CPU can pipeline comparisons without the branch overhead
+// and cache thrashing of per-row bookkeeping spread across the write path.
+std::vector<uint32_t> Tablet::find_all_device_boundaries() const {
+ const uint32_t row_count = get_cur_row_size();
+ if (row_count <= 1) return {};
+
+ const uint32_t nwords = (row_count + 63) / 64;
+ std::vector<uint64_t> boundary(nwords, 0);
+
+ uint32_t boundary_count = 0;
+ const uint32_t max_boundaries = row_count - 1;
+ for (auto it = id_column_indexes_.rbegin(); it !=
id_column_indexes_.rend();
+ ++it) {
+ const StringColumn& sc = *value_matrix_[*it].string_col;
+ const int32_t* off = sc.offsets;
+ const char* buf = sc.buffer;
+ for (uint32_t i = 1; i < row_count; i++) {
+ if (boundary[i >> 6] & (1ULL << (i & 63))) continue;
+ int32_t len_a = off[i] - off[i - 1];
+ int32_t len_b = off[i + 1] - off[i];
+ if (len_a != len_b ||
+ (len_a > 0 && memcmp(buf + off[i - 1], buf + off[i],
+ static_cast<uint32_t>(len_a)) != 0)) {
+ boundary[i >> 6] |= (1ULL << (i & 63));
+ if (++boundary_count >= max_boundaries) break;
+ }
+ }
+ if (boundary_count >= max_boundaries) break;
+ }
+
+ // Sweep the bitmap word by word, extracting set bit positions in order.
+ // Each word covers 64 consecutive rows: word w covers rows [w*64,
w*64+63].
+ //
+ // For each word we use two standard bit tricks:
+ // __builtin_ctzll(bits) — count trailing zeros = index of lowest set
bit
+ // bits &= bits - 1 — clear the lowest set bit
+ //
+ // Example: w=1, bits=0b...00010100 (bits 2 and 4 set)
+ // iter 1: ctzll=2 → idx=1*64+2=66, bits becomes 0b...00010000
+ // iter 2: ctzll=4 → idx=1*64+4=68, bits becomes 0b...00000000 → exit
+ //
+ // Guards: idx>0 because row 0 can never be a boundary (no predecessor);
+ // idx<row_count trims padding bits in the last word when row_count%64 !=
0.
+ std::vector<uint32_t> result;
+ for (uint32_t w = 0; w < nwords; w++) {
+ uint64_t bits = boundary[w];
+ while (bits) {
+ uint32_t bit = __builtin_ctzll(bits);
+ uint32_t idx = w * 64 + bit;
+ if (idx > 0 && idx < row_count) {
+ result.push_back(idx);
+ }
+ bits &= bits - 1;
+ }
+ }
+ return result;
+}
+
std::shared_ptr<IDeviceID> Tablet::get_device_id(int i) const {
std::vector<std::string*> id_array;
id_array.push_back(new std::string(insert_target_name_));
diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h
index e47aa5c4..beedacc0 100644
--- a/cpp/src/common/tablet.h
+++ b/cpp/src/common/tablet.h
@@ -46,6 +46,71 @@ class TabletColIterator;
* with their associated metadata such as column names and types.
*/
class Tablet {
+ // Arrow-style string column: offsets + contiguous buffer.
+ // string[i] = buffer + offsets[i], len = offsets[i+1] - offsets[i]
+ struct StringColumn {
+ int32_t* offsets; // length: max_rows + 1 (Arrow-compatible)
+ char* buffer; // contiguous string data
+ uint32_t buf_capacity; // allocated buffer size
+ uint32_t buf_used; // bytes written so far
+
+ StringColumn()
+ : offsets(nullptr), buffer(nullptr), buf_capacity(0), buf_used(0)
{}
+
+ void init(uint32_t max_rows, uint32_t init_buf_capacity) {
+ offsets = (int32_t*)common::mem_alloc(
+ sizeof(int32_t) * (max_rows + 1), common::MOD_DEFAULT);
+ offsets[0] = 0;
+ buf_capacity = init_buf_capacity;
+ buffer =
+ (char*)common::mem_alloc(buf_capacity, common::MOD_DEFAULT);
+ buf_used = 0;
+ }
+
+ void destroy() {
+ if (offsets) common::mem_free(offsets);
+ offsets = nullptr;
+ if (buffer) common::mem_free(buffer);
+ buffer = nullptr;
+ buf_capacity = buf_used = 0;
+ }
+
+ void reset() {
+ buf_used = 0;
+ if (offsets) offsets[0] = 0;
+ }
+
+ void append(uint32_t row, const char* data, uint32_t len) {
+ // Grow buffer if needed
+ if (buf_used + len > buf_capacity) {
+ buf_capacity = buf_capacity * 2 + len;
+ buffer = (char*)common::mem_realloc(buffer, buf_capacity);
+ }
+ memcpy(buffer + buf_used, data, len);
+ offsets[row] = static_cast<int32_t>(buf_used);
+ offsets[row + 1] = static_cast<int32_t>(buf_used + len);
+ buf_used += len;
+ }
+
+ const char* get_str(uint32_t row) const {
+ return buffer + offsets[row];
+ }
+ uint32_t get_len(uint32_t row) const {
+ return static_cast<uint32_t>(offsets[row + 1] - offsets[row]);
+ }
+ // Return a String view for a given row. The returned reference is
+ // valid until the next call to get_string_view on this column.
+ common::String& get_string_view(uint32_t row) {
+ view_cache_.buf_ = buffer + offsets[row];
+ view_cache_.len_ =
+ static_cast<uint32_t>(offsets[row + 1] - offsets[row]);
+ return view_cache_;
+ }
+
+ private:
+ common::String view_cache_;
+ };
+
struct ValueMatrixEntry {
union {
int32_t* int32_data;
@@ -53,7 +118,7 @@ class Tablet {
float* float_data;
double* double_data;
bool* bool_data;
- common::String* string_data;
+ StringColumn* string_col;
};
};
@@ -220,6 +285,16 @@ class Tablet {
void set_column_categories(
const std::vector<common::ColumnCategory>& column_categories);
std::shared_ptr<IDeviceID> get_device_id(int i) const;
+ std::vector<uint32_t> find_all_device_boundaries() const;
+
+ // Bulk copy string column data (offsets + data buffer).
+ // offsets has count+1 entries and must start from 0 (offsets[0] == 0).
+ // bitmap follows TsFile convention (bit=1 means null, nullptr means all
+ // valid). Callers using Arrow convention (bit=1 means valid) must invert
+ // before calling.
+ int set_column_string_values(uint32_t schema_index, const int32_t* offsets,
+ const char* data, const uint8_t* bitmap,
+ uint32_t count);
/**
* @brief Template function to add a value of type T to the specified row
* and column by name.
@@ -253,6 +328,8 @@ class Tablet {
schema_map_ = schema_map;
}
+ void reset_string_columns();
+
friend class TabletColIterator;
friend class TsFileWriter;
friend struct MeasurementNamesFromTablet;
@@ -265,7 +342,6 @@ class Tablet {
private:
template <typename T>
void process_val(uint32_t row_index, uint32_t schema_index, T val);
- common::PageArena page_arena_{common::MOD_TABLET};
uint32_t max_row_num_;
uint32_t cur_row_size_;
std::string insert_target_name_;
diff --git a/cpp/src/cwrapper/arrow_c.cc b/cpp/src/cwrapper/arrow_c.cc
index 6f56cfc6..931c17de 100644
--- a/cpp/src/cwrapper/arrow_c.cc
+++ b/cpp/src/cwrapper/arrow_c.cc
@@ -714,6 +714,43 @@ int TsBlockToArrowStruct(common::TsBlock& tsblock,
ArrowArray* out_array,
return common::E_OK;
}
+// Allocate and return a TsFile null bitmap (bit=1=null) by inverting an Arrow
+// validity bitmap (bit=1=valid). bit_offset is the Arrow array's offset field;
+// bits [bit_offset, bit_offset+n_rows) are extracted and inverted.
+// Returns nullptr if validity is nullptr (all rows valid, no allocation
needed)
+// or on OOM. Caller must mem_free the result.
+// To distinguish OOM from "no validity": OOM only when validity!=nullptr &&
+// result==nullptr.
+static uint8_t* InvertArrowBitmap(const uint8_t* validity, int64_t bit_offset,
+ uint32_t n_rows) {
+ if (validity == nullptr) {
+ return nullptr;
+ }
+ uint32_t bm_bytes = (n_rows + 7) / 8;
+ uint8_t* null_bm =
+ static_cast<uint8_t*>(common::mem_alloc(bm_bytes,
common::MOD_TSBLOCK));
+ if (null_bm == nullptr) {
+ return nullptr;
+ }
+ if (bit_offset == 0) {
+ // Fast path: byte-level invert when there is no bit misalignment.
+ for (uint32_t b = 0; b < bm_bytes; b++) {
+ null_bm[b] = ~validity[b];
+ }
+ } else {
+ // Sliced array: extract one bit at a time starting at bit_offset.
+ std::memset(null_bm, 0, bm_bytes);
+ for (uint32_t i = 0; i < n_rows; i++) {
+ int64_t src = bit_offset + i;
+ uint8_t valid = (validity[src / 8] >> (src % 8)) & 1;
+ if (!valid) {
+ null_bm[i / 8] |= static_cast<uint8_t>(1u << (i % 8));
+ }
+ }
+ }
+ return null_bm;
+}
+
// Check if Arrow row is valid (non-null) based on validity bitmap
static bool ArrowIsValid(const ArrowArray* arr, int64_t row) {
if (arr->null_count == 0 || arr->buffers[0] == nullptr) return true;
@@ -814,6 +851,13 @@ int ArrowStructToTablet(const char* table_name, const
ArrowArray* in_array,
const ArrowArray* col_arr = in_array->children[data_col_indices[ci]];
common::TSDataType dtype = read_modes[ci];
uint32_t tcol = static_cast<uint32_t>(ci);
+ // ArrowArray::offset is non-zero when the array is a slice of a larger
+ // buffer — for example, when Python pandas/PyArrow passes a column
that
+ // was created via slice(), take(), or filter() without a copy, or when
+ // RecordBatch::Slice() is used to split a batch. In those cases the
+ // underlying buffer starts at element 0 of the original allocation, so
+ // all buffer accesses (data, offsets, validity bitmap) must be shifted
+ // by `off` before reading the `length` visible elements.
int64_t off = col_arr->offset;
const uint8_t* validity =
@@ -837,26 +881,21 @@ int ArrowStructToTablet(const char* table_name, const
ArrowArray* in_array,
case common::INT64:
case common::FLOAT:
case common::DOUBLE: {
- // Invert Arrow bitmap (1=valid) to TsFile bitmap (1=null)
- const uint8_t* null_bm = nullptr;
- uint8_t* inverted_bm = nullptr;
- if (validity != nullptr) {
- uint32_t bm_bytes = (static_cast<uint32_t>(n_rows) + 7) /
8;
- inverted_bm = static_cast<uint8_t*>(
- common::mem_alloc(bm_bytes, common::MOD_TSBLOCK));
- if (inverted_bm == nullptr) {
- delete tablet;
- return common::E_OOM;
- }
- for (uint32_t b = 0; b < bm_bytes; b++) {
- inverted_bm[b] = ~validity[b];
- }
- null_bm = inverted_bm;
+ size_t elem_size =
+ (dtype == common::INT64 || dtype == common::DOUBLE) ? 8 :
4;
+ const void* data =
+ static_cast<const char*>(col_arr->buffers[1]) +
+ off * elem_size;
+ uint8_t* null_bm = InvertArrowBitmap(
+ validity, off, static_cast<uint32_t>(n_rows));
+ if (validity != nullptr && null_bm == nullptr) {
+ delete tablet;
+ return common::E_OOM;
}
- tablet->set_column_values(tcol, col_arr->buffers[1], null_bm,
+ tablet->set_column_values(tcol, data, null_bm,
static_cast<uint32_t>(n_rows));
- if (inverted_bm != nullptr) {
- common::mem_free(inverted_bm);
+ if (null_bm != nullptr) {
+ common::mem_free(null_bm);
}
break;
}
@@ -877,16 +916,45 @@ int ArrowStructToTablet(const char* table_name, const
ArrowArray* in_array,
case common::TEXT:
case common::STRING:
case common::BLOB: {
- const int32_t* offsets =
- static_cast<const int32_t*>(col_arr->buffers[1]);
- const char* data =
+ // set_column_string_values requires offsets[0] == 0.
+ // When off > 0 (sliced Arrow array), normalize here: shift
+ // offsets down by base and advance the data pointer
+ // accordingly.
+ const int32_t* raw_offsets =
+ static_cast<const int32_t*>(col_arr->buffers[1]) + off;
+ const char* raw_data =
static_cast<const char*>(col_arr->buffers[2]);
- for (int64_t r = 0; r < n_rows; r++) {
- if (!ArrowIsValid(col_arr, r)) continue;
- int32_t start = offsets[off + r];
- int32_t len = offsets[off + r + 1] - start;
- tablet->add_value(static_cast<uint32_t>(r), tcol,
- common::String(data + start, len));
+ uint32_t nrows = static_cast<uint32_t>(n_rows);
+ const int32_t* offsets = raw_offsets;
+ const char* data = raw_data;
+ int32_t* norm_offsets = nullptr;
+ if (off > 0) {
+ int32_t base = raw_offsets[0];
+ norm_offsets = static_cast<int32_t*>(common::mem_alloc(
+ (nrows + 1) * sizeof(int32_t), common::MOD_TSBLOCK));
+ if (norm_offsets == nullptr) {
+ delete tablet;
+ return common::E_OOM;
+ }
+ for (uint32_t i = 0; i <= nrows; i++) {
+ norm_offsets[i] = raw_offsets[i] - base;
+ }
+ offsets = norm_offsets;
+ data = raw_data + base;
+ }
+ uint8_t* null_bm = InvertArrowBitmap(validity, off, nrows);
+ if (validity != nullptr && null_bm == nullptr) {
+ common::mem_free(norm_offsets);
+ delete tablet;
+ return common::E_OOM;
+ }
+ tablet->set_column_string_values(tcol, offsets, data, null_bm,
+ nrows);
+ if (null_bm != nullptr) {
+ common::mem_free(null_bm);
+ }
+ if (norm_offsets != nullptr) {
+ common::mem_free(norm_offsets);
}
break;
}
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 786325db..657fcabc 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -1412,6 +1412,9 @@ int TsFileWriter::write_table(Tablet& tablet) {
}
}
record_count_since_last_flush_ += tablet.cur_row_size_;
+ // Reset string column buffers so the tablet can be reused for the next
+ // batch without accumulating memory across writes.
+ tablet.reset_string_columns();
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
@@ -1419,10 +1422,10 @@ int TsFileWriter::write_table(Tablet& tablet) {
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>>
TsFileWriter::split_tablet_by_device(const Tablet& tablet) {
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> result;
- std::shared_ptr<IDeviceID> last_device_id =
- std::make_shared<StringArrayDeviceID>("last_device_id");
+
if (tablet.id_column_indexes_.empty()) {
- result.emplace_back(std::move(last_device_id), 0);
+ auto sentinel =
std::make_shared<StringArrayDeviceID>("last_device_id");
+ result.emplace_back(std::move(sentinel), 0);
std::vector<std::string*> id_array;
id_array.push_back(new std::string(tablet.insert_target_name_));
auto res = std::make_shared<StringArrayDeviceID>(id_array);
@@ -1431,14 +1434,22 @@ TsFileWriter::split_tablet_by_device(const Tablet&
tablet) {
return result;
}
- for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) {
- std::shared_ptr<IDeviceID> cur_device_id(tablet.get_device_id(i));
- if (*cur_device_id != *last_device_id) {
- result.emplace_back(std::move(last_device_id), i);
- last_device_id = std::move(cur_device_id);
- }
+ const uint32_t row_count = tablet.get_cur_row_size();
+ if (row_count == 0) return result;
+
+ auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id");
+ result.emplace_back(std::move(sentinel), 0);
+
+ auto boundaries = tablet.find_all_device_boundaries();
+
+ uint32_t seg_start = 0;
+ for (uint32_t b : boundaries) {
+ std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start));
+ result.emplace_back(std::move(dev_id), b);
+ seg_start = b;
}
- result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size());
+ std::shared_ptr<IDeviceID> last_id(tablet.get_device_id(seg_start));
+ result.emplace_back(std::move(last_id), row_count);
return result;
}
@@ -1474,7 +1485,7 @@ int TsFileWriter::write_column(ChunkWriter* chunk_writer,
const Tablet& tablet,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::STRING) {
ret =
- write_typed_column(chunk_writer, timestamps,
col_values.string_data,
+ write_typed_column(chunk_writer, timestamps, col_values.string_col,
col_notnull_bitmap, start_idx, end_idx);
} else {
ASSERT(false);
@@ -1539,8 +1550,8 @@ int TsFileWriter::value_write_column(ValueChunkWriter*
value_chunk_writer,
case common::TEXT:
case common::BLOB:
ret = write_typed_column(value_chunk_writer, timestamps,
- (common::String*)col_values.string_data,
- col_notnull_bitmap, start_idx, end_idx);
+ col_values.string_col, col_notnull_bitmap,
+ start_idx, end_idx);
break;
default:
ret = E_NOT_SUPPORT;
@@ -1618,10 +1629,22 @@ int TsFileWriter::write_typed_column(ChunkWriter*
chunk_writer,
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps,
- common::String* col_values,
+ Tablet::StringColumn* string_col,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
- DO_WRITE_TYPED_COLUMN();
+ int ret = E_OK;
+ for (uint32_t r = start_idx; r < end_idx; r++) {
+ if (LIKELY(!col_notnull_bitmap.test(r))) {
+ common::String val(
+ string_col->buffer + string_col->offsets[r],
+ static_cast<uint32_t>(string_col->offsets[r + 1] -
+ string_col->offsets[r]));
+ if (RET_FAIL(chunk_writer->write(timestamps[r], val))) {
+ return ret;
+ }
+ }
+ }
+ return ret;
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
@@ -1661,10 +1684,26 @@ int TsFileWriter::write_typed_column(ValueChunkWriter*
value_chunk_writer,
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps,
- common::String* col_values,
+ Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
- DO_VALUE_WRITE_TYPED_COLUMN();
+ int ret = E_OK;
+ for (uint32_t r = start_idx; r < end_idx; r++) {
+ common::String val(string_col->buffer + string_col->offsets[r],
+ static_cast<uint32_t>(string_col->offsets[r + 1] -
+ string_col->offsets[r]));
+ if (LIKELY(col_notnull_bitmap.test(r))) {
+ if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true)))
{
+ return ret;
+ }
+ } else {
+ if (RET_FAIL(
+ value_chunk_writer->write(timestamps[r], val, false))) {
+ return ret;
+ }
+ }
+ }
+ return ret;
}
// TODO make sure ret is meaningful to SDK user
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index ff7cdbac..01028e2e 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -145,7 +145,7 @@ class TsFileWriter {
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ChunkWriter* chunk_writer, int64_t* timestamps,
- common::String* col_values,
+ Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);
@@ -206,7 +206,8 @@ class TsFileWriter {
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);
int write_typed_column(ValueChunkWriter* value_chunk_writer,
- int64_t* timestamps, common::String* col_values,
+ int64_t* timestamps,
+ Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx);
diff --git a/cpp/test/common/tsblock/arrow_tsblock_test.cc
b/cpp/test/common/tsblock/arrow_tsblock_test.cc
index 123efb59..348c18a4 100644
--- a/cpp/test/common/tsblock/arrow_tsblock_test.cc
+++ b/cpp/test/common/tsblock/arrow_tsblock_test.cc
@@ -20,6 +20,7 @@
#include <cstring>
+#include "common/tablet.h"
#include "common/tsblock/tsblock.h"
#include "cwrapper/tsfile_cwrapper.h"
#include "utils/db_utils.h"
@@ -34,9 +35,13 @@ using ArrowSchema = ::ArrowSchema;
#define ARROW_FLAG_NULLABLE 2
#define ARROW_FLAG_MAP_KEYS_SORTED 4
-// Function declaration (defined in arrow_c.cc)
+// Function declarations (defined in arrow_c.cc)
int TsBlockToArrowStruct(common::TsBlock& tsblock, ArrowArray* out_array,
ArrowSchema* out_schema);
+int ArrowStructToTablet(const char* table_name, const ArrowArray* in_array,
+ const ArrowSchema* in_schema,
+ const storage::TableSchema* reg_schema,
+ storage::Tablet** out_tablet, int time_col_index);
} // namespace arrow
static void VerifyArrowSchema(
@@ -332,3 +337,152 @@ TEST(ArrowTsBlockTest, TsBlock_EdgeCases) {
}
}
}
+
+// Test ArrowStructToTablet with sliced Arrow arrays (offset > 0).
+// Full arrays have 5 rows; offset=2 on every child means only rows [2..4]
+// (3 rows) are consumed. Row index 3 in the full array (local index 1 in the
+// slice) carries a null in the INT32 column.
+TEST(ArrowStructToTabletTest, SlicedArray_WithOffset) {
+ // --- timestamps (int64, no nulls) ---
+ int64_t ts_data[5] = {1000, 1001, 1002, 1003, 1004};
+ const void* ts_bufs[2] = {nullptr, ts_data};
+ ArrowArray ts_arr = {};
+ ts_arr.length = 3;
+ ts_arr.offset = 2;
+ ts_arr.null_count = 0;
+ ts_arr.n_buffers = 2;
+ ts_arr.buffers = ts_bufs;
+
+ ArrowSchema ts_schema = {};
+ ts_schema.format = "l";
+ ts_schema.name = "time";
+ ts_schema.flags = ARROW_FLAG_NULLABLE;
+
+ // --- INT32 column: values [100..104], row 3 (global) = local row 1 null
+ // Arrow validity bitmap: bit=1 means valid.
+ // bits 0,1,2,4=valid, bit 3=null → byte 0 = 0b00010111 = 0x17
+ int32_t int_data[5] = {100, 101, 102, 103, 104};
+ uint8_t int_validity[1] = {0x17};
+ const void* int_bufs[2] = {int_validity, int_data};
+ ArrowArray int_arr = {};
+ int_arr.length = 3;
+ int_arr.offset = 2;
+ int_arr.null_count = 1;
+ int_arr.n_buffers = 2;
+ int_arr.buffers = int_bufs;
+
+ ArrowSchema int_schema = {};
+ int_schema.format = "i";
+ int_schema.name = "int_col";
+ int_schema.flags = ARROW_FLAG_NULLABLE;
+
+ // --- DOUBLE column: values [10.0..14.0], no nulls ---
+ double dbl_data[5] = {10.0, 11.0, 12.0, 13.0, 14.0};
+ const void* dbl_bufs[2] = {nullptr, dbl_data};
+ ArrowArray dbl_arr = {};
+ dbl_arr.length = 3;
+ dbl_arr.offset = 2;
+ dbl_arr.null_count = 0;
+ dbl_arr.n_buffers = 2;
+ dbl_arr.buffers = dbl_bufs;
+
+ ArrowSchema dbl_schema = {};
+ dbl_schema.format = "g";
+ dbl_schema.name = "dbl_col";
+ dbl_schema.flags = ARROW_FLAG_NULLABLE;
+
+ // --- UTF-8 string column: "str0".."str4", no nulls ---
+ // With offset=2, the slice covers "str2","str3","str4".
+ const char str_chars[] = "str0str1str2str3str4";
+ int32_t str_offs[6] = {0, 4, 8, 12, 16, 20};
+ const void* str_bufs[3] = {nullptr, str_offs, str_chars};
+ ArrowArray str_arr = {};
+ str_arr.length = 3;
+ str_arr.offset = 2;
+ str_arr.null_count = 0;
+ str_arr.n_buffers = 3;
+ str_arr.buffers = str_bufs;
+
+ ArrowSchema str_schema = {};
+ str_schema.format = "u";
+ str_schema.name = "str_col";
+ str_schema.flags = ARROW_FLAG_NULLABLE;
+
+ // --- parent struct array ---
+ ArrowArray* children[4] = {&ts_arr, &int_arr, &dbl_arr, &str_arr};
+ ArrowArray parent = {};
+ parent.length = 3;
+ parent.n_buffers = 0;
+ parent.n_children = 4;
+ parent.children = children;
+
+ ArrowSchema* child_schemas[4] = {&ts_schema, &int_schema, &dbl_schema,
+ &str_schema};
+ ArrowSchema parent_schema = {};
+ parent_schema.format = "+s";
+ parent_schema.n_children = 4;
+ parent_schema.children = child_schemas;
+
+ storage::Tablet* tablet = nullptr;
+ // time_col_index=0 → timestamp from ts_arr; data cols are int, dbl, str
+ int ret = arrow::ArrowStructToTablet("test_table", &parent, &parent_schema,
+ nullptr, &tablet, 0);
+ ASSERT_EQ(ret, common::E_OK);
+ ASSERT_NE(tablet, nullptr);
+
+ EXPECT_EQ(tablet->get_cur_row_size(), 3u);
+
+ common::TSDataType dtype;
+ void* v;
+
+ // INT32 col (schema_index=0): local rows 0,1,2 → 102, null, 104
+ v = tablet->get_value(0, 0, dtype);
+ ASSERT_NE(v, nullptr);
+ EXPECT_EQ(*static_cast<int32_t*>(v), 102);
+
+ v = tablet->get_value(1, 0, dtype);
+ EXPECT_EQ(v, nullptr); // row 3 in original data is null
+
+ v = tablet->get_value(2, 0, dtype);
+ ASSERT_NE(v, nullptr);
+ EXPECT_EQ(*static_cast<int32_t*>(v), 104);
+
+ // DOUBLE col (schema_index=1): local rows 0,1,2 → 12.0, 13.0, 14.0
+ v = tablet->get_value(0, 1, dtype);
+ ASSERT_NE(v, nullptr);
+ EXPECT_DOUBLE_EQ(*static_cast<double*>(v), 12.0);
+
+ v = tablet->get_value(1, 1, dtype);
+ ASSERT_NE(v, nullptr);
+ EXPECT_DOUBLE_EQ(*static_cast<double*>(v), 13.0);
+
+ v = tablet->get_value(2, 1, dtype);
+ ASSERT_NE(v, nullptr);
+ EXPECT_DOUBLE_EQ(*static_cast<double*>(v), 14.0);
+
+ // STRING col (schema_index=2): local rows 0,1,2 → "str2","str3","str4"
+ // Arrow "u" maps to common::TEXT; offset normalization in arrow_c.cc
+ // ensures offsets[0]==0 before calling set_column_string_values.
+ v = tablet->get_value(0, 2, dtype);
+ ASSERT_NE(v, nullptr);
+ {
+ common::String* s = static_cast<common::String*>(v);
+ EXPECT_EQ(std::string(s->buf_, s->len_), "str2");
+ }
+
+ v = tablet->get_value(1, 2, dtype);
+ ASSERT_NE(v, nullptr);
+ {
+ common::String* s = static_cast<common::String*>(v);
+ EXPECT_EQ(std::string(s->buf_, s->len_), "str3");
+ }
+
+ v = tablet->get_value(2, 2, dtype);
+ ASSERT_NE(v, nullptr);
+ {
+ common::String* s = static_cast<common::String*>(v);
+ EXPECT_EQ(std::string(s->buf_, s->len_), "str4");
+ }
+
+ delete tablet;
+}