This is an automated email from the ASF dual-hosted git repository.
hongzhigao pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new 55d5932ed Fix/aligned tv page seal (#734)
55d5932ed is described below
commit 55d5932ed5330252f73cf68f1a98060eccbf3673
Author: Hongzhi Gao <[email protected]>
AuthorDate: Mon Mar 30 13:10:51 2026 +0800
Fix/aligned tv page seal (#734)
* fix readme logo
* fix readme logo
* fix readme badge
* tmp
* add ut
* mvn spotless:apply
* tmp
* try to fix ut
* Align C++ aligned-model page sealing with the Java behavior and fix
reader handling of null-only value pages so that Debug builds pass.
* fix ut
* Add strict_page_size switch to optimize aligned tablet writing.
In non-strict mode, disable per-write auto page sealing and seal value
pages at time-page boundaries to reduce overhead while preserving aligned page
semantics.
* fix QueryByRowFasterThanManualNext tolerance
* fix time_page_row_ends.reserve
---
cpp/src/common/config/config.h | 7 +
cpp/src/common/global.cc | 6 +
cpp/src/reader/aligned_chunk_reader.cc | 3 -
cpp/src/reader/qds_without_timegenerator.cc | 19 +-
cpp/src/writer/time_chunk_writer.cc | 3 +
cpp/src/writer/time_chunk_writer.h | 34 +-
cpp/src/writer/tsfile_writer.cc | 504 ++++++++++++++++++++-
cpp/src/writer/tsfile_writer.h | 6 +
cpp/src/writer/value_chunk_writer.cc | 12 +-
cpp/src/writer/value_chunk_writer.h | 31 +-
cpp/src/writer/value_page_writer.cc | 12 +-
cpp/src/writer/value_page_writer.h | 17 +-
.../reader/table_view/tsfile_reader_table_test.cc | 15 +
.../table_view/tsfile_table_query_by_row_test.cc | 4 +-
.../tree_view/tsfile_tree_query_by_row_test.cc | 4 +-
cpp/test/writer/tsfile_writer_test.cc | 235 ++++++++++
16 files changed, 867 insertions(+), 45 deletions(-)
diff --git a/cpp/src/common/config/config.h b/cpp/src/common/config/config.h
index 0f192c8d2..81dad924f 100644
--- a/cpp/src/common/config/config.h
+++ b/cpp/src/common/config/config.h
@@ -46,6 +46,12 @@ typedef struct ConfigValue {
TSEncoding double_encoding_type_;
TSEncoding string_encoding_type_;
CompressionType default_compression_type_;
+ // When true, aligned writer enforces page size limit strictly by
+ // interleaving time/value writes and sealing pages together when any side
+ // becomes full.
+ // When false, aligned writer may disable some page-size checks to improve
+ // write performance.
+ bool strict_page_size_ = true;
} ConfigValue;
extern void init_config_value();
@@ -57,6 +63,7 @@ extern void set_config_value();
extern void config_set_page_max_point_count(uint32_t page_max_point_count);
extern void config_set_max_degree_of_index_node(
uint32_t max_degree_of_index_node);
+extern void config_set_strict_page_size(bool strict_page_size);
} // namespace common
diff --git a/cpp/src/common/global.cc b/cpp/src/common/global.cc
index fd1d0132d..91ecedda1 100644
--- a/cpp/src/common/global.cc
+++ b/cpp/src/common/global.cc
@@ -60,6 +60,8 @@ void init_config_value() {
#else
g_config_value_.default_compression_type_ = UNCOMPRESSED;
#endif
+ // Enforce aligned page size limits strictly by default.
+ g_config_value_.strict_page_size_ = true;
}
extern TSEncoding get_value_encoder(TSDataType data_type) {
@@ -104,6 +106,10 @@ void config_set_max_degree_of_index_node(uint32_t
max_degree_of_index_node) {
g_config_value_.max_degree_of_index_node_ = max_degree_of_index_node;
}
+void config_set_strict_page_size(bool strict_page_size) {
+ g_config_value_.strict_page_size_ = strict_page_size;
+}
+
void set_config_value() {}
const char* s_data_type_names[8] = {"BOOLEAN", "INT32", "INT64", "FLOAT",
"DOUBLE", "TEXT", "VECTOR", "STRING"};
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 51da63e84..2d117b1c9 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -562,7 +562,6 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
row_appender.append_null(1);
\
continue;
\
}
\
- assert(value_decoder_->has_remaining(value_in));
\
if (!value_decoder_->has_remaining(value_in)) {
\
return common::E_DATA_INCONSISTENCY;
\
}
\
@@ -609,7 +608,6 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
row_appender.append_null(1);
continue;
}
- assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return common::E_DATA_INCONSISTENCY;
}
@@ -695,7 +693,6 @@ int AlignedChunkReader::STRING_DECODE_TYPED_TV_INTO_TSBLOCK(
}
if (should_read_data) {
- assert(value_decoder_->has_remaining(value_in));
if (!value_decoder_->has_remaining(value_in)) {
return E_DATA_INCONSISTENCY;
}
diff --git a/cpp/src/reader/qds_without_timegenerator.cc
b/cpp/src/reader/qds_without_timegenerator.cc
index d8129ce0e..0710b5873 100644
--- a/cpp/src/reader/qds_without_timegenerator.cc
+++ b/cpp/src/reader/qds_without_timegenerator.cc
@@ -167,11 +167,14 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {
uint32_t len = 0;
uint32_t idx = heap_time_.begin()->second;
+ bool is_null_val = false;
auto val_datatype = value_iters_[idx]->get_data_type();
- void* val_ptr = value_iters_[idx]->read(&len);
+ void* val_ptr = value_iters_[idx]->read(&len, &is_null_val);
if (!skip_row) {
- row_record_->get_field(idx + 1)->set_value(val_datatype,
- val_ptr, len, pa_);
+ if (!is_null_val) {
+ row_record_->get_field(idx + 1)->set_value(
+ val_datatype, val_ptr, len, pa_);
+ }
}
value_iters_[idx]->next();
@@ -219,10 +222,14 @@ int QDSWithoutTimeGenerator::next(bool& has_next) {
std::multimap<int64_t, uint32_t>::iterator iter =
heap_time_.find(time);
for (uint32_t i = 0; i < count; ++i) {
uint32_t len = 0;
+ bool is_null_val = false;
auto val_datatype = value_iters_[iter->second]->get_data_type();
- void* val_ptr = value_iters_[iter->second]->read(&len);
- row_record_->get_field(iter->second + 1)
- ->set_value(val_datatype, val_ptr, len, pa_);
+ void* val_ptr =
+ value_iters_[iter->second]->read(&len, &is_null_val);
+ if (!is_null_val) {
+ row_record_->get_field(iter->second + 1)
+ ->set_value(val_datatype, val_ptr, len, pa_);
+ }
value_iters_[iter->second]->next();
if (!time_iters_[iter->second]->end()) {
int64_t timev =
diff --git a/cpp/src/writer/time_chunk_writer.cc
b/cpp/src/writer/time_chunk_writer.cc
index 5f004a0f5..0c7e3b212 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -173,6 +173,9 @@ int TimeChunkWriter::end_encode_chunk() {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
+ } else if (num_of_pages_ > 0) {
+ chunk_header_.data_size_ = chunk_data_.total_size();
+ chunk_header_.num_of_pages_ = num_of_pages_;
}
#if DEBUG_SE
std::cout << "end_encode_time_chunk: num_of_pages_=" << num_of_pages_
diff --git a/cpp/src/writer/time_chunk_writer.h
b/cpp/src/writer/time_chunk_writer.h
index 0c6e1f18a..c67516ba5 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -42,7 +42,8 @@ class TimeChunkWriter {
first_page_data_(),
first_page_statistic_(nullptr),
chunk_header_(),
- num_of_pages_(0) {}
+ num_of_pages_(0),
+ enable_page_seal_if_full_(true) {}
~TimeChunkWriter() { destroy(); }
int init(const common::ColumnSchema& col_schema);
int init(const std::string& measurement_name, common::TSEncoding encoding,
@@ -57,8 +58,12 @@ class TimeChunkWriter {
if (RET_FAIL(time_page_writer_.write(timestamp))) {
return ret;
}
- if (RET_FAIL(seal_cur_page_if_full())) {
+ if (UNLIKELY(!enable_page_seal_if_full_)) {
return ret;
+ } else {
+ if (RET_FAIL(seal_cur_page_if_full())) {
+ return ret;
+ }
}
return ret;
}
@@ -68,10 +73,33 @@ class TimeChunkWriter {
Statistic* get_chunk_statistic() { return chunk_statistic_; }
FORCE_INLINE int32_t num_of_pages() const { return num_of_pages_; }
+ // Current (unsealed) page point count.
+ FORCE_INLINE uint32_t get_point_numer() const {
+ return time_page_writer_.get_point_numer();
+ }
+
int64_t estimate_max_series_mem_size();
bool hasData();
+ /** True if the current (unsealed) page has at least one point. */
+ bool has_current_page_data() const {
+ return time_page_writer_.get_point_numer() > 0;
+ }
+
+ /**
+ * Force seal the current page (for aligned model: when any aligned page
+ * seals due to memory/point threshold, all pages must seal together).
+ * @return E_OK on success.
+ */
+ int seal_current_page() { return seal_cur_page(false); }
+
+ // For aligned writer: allow disabling the automatic page-size/point-number
+ // check so the caller can seal pages at chosen boundaries.
+ FORCE_INLINE void set_enable_page_seal_if_full(bool enable) {
+ enable_page_seal_if_full_ = enable;
+ }
+
private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
@@ -110,6 +138,8 @@ class TimeChunkWriter {
ChunkHeader chunk_header_;
int32_t num_of_pages_;
+ // If false, write() won't auto-seal when the current page becomes full.
+ bool enable_page_seal_if_full_;
};
} // end namespace storage
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index 85a816ef6..786325db5 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -61,6 +61,10 @@ void set_max_degree_of_index_node(uint32_t
max_degree_of_index_node) {
config_set_max_degree_of_index_node(max_degree_of_index_node);
}
+void set_strict_page_size(bool strict_page_size) {
+ config_set_strict_page_size(strict_page_size);
+}
+
TsFileWriter::TsFileWriter()
: write_file_(nullptr),
io_writer_(nullptr),
@@ -722,6 +726,14 @@ int TsFileWriter::write_record_aligned(const TsRecord&
record) {
if (value_chunk_writers.size() != record.points_.size()) {
return E_INVALID_ARG;
}
+ int32_t time_pages_before = time_chunk_writer->num_of_pages();
+ std::vector<int32_t> value_pages_before(value_chunk_writers.size(), 0);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer)) {
+ value_pages_before[c] = value_chunk_writer->num_of_pages();
+ }
+ }
time_chunk_writer->write(record.timestamp_);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
@@ -731,6 +743,11 @@ int TsFileWriter::write_record_aligned(const TsRecord&
record) {
write_point_aligned(value_chunk_writer, record.timestamp_,
data_types[c], record.points_[c]);
}
+ if (RET_FAIL(maybe_seal_aligned_pages_together(
+ time_chunk_writer, value_chunk_writers, time_pages_before,
+ value_pages_before))) {
+ return ret;
+ }
return ret;
}
@@ -792,6 +809,41 @@ int TsFileWriter::write_point_aligned(ValueChunkWriter*
value_chunk_writer,
}
}
+int TsFileWriter::maybe_seal_aligned_pages_together(
+ TimeChunkWriter* time_chunk_writer,
+ common::SimpleVector<ValueChunkWriter*>& value_chunk_writers,
+ int32_t time_pages_before, const std::vector<int32_t>& value_pages_before)
{
+ bool should_seal_all =
+ time_chunk_writer->num_of_pages() > time_pages_before;
+ for (uint32_t c = 0; c < value_chunk_writers.size() && !should_seal_all;
+ c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer) &&
+ value_chunk_writer->num_of_pages() > value_pages_before[c]) {
+ should_seal_all = true;
+ break;
+ }
+ }
+ if (!should_seal_all) {
+ return E_OK;
+ }
+
+ int ret = E_OK;
+ if (time_chunk_writer->has_current_page_data() &&
+ RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer) &&
+ value_chunk_writer->has_current_page_data() &&
+ RET_FAIL(value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ return ret;
+}
+
int TsFileWriter::write_tablet_aligned(const Tablet& tablet) {
int ret = E_OK;
SimpleVector<ValueChunkWriter*> value_chunk_writers;
@@ -804,16 +856,218 @@ int TsFileWriter::write_tablet_aligned(const Tablet&
tablet) {
data_types))) {
return ret;
}
- time_write_column(time_chunk_writer, tablet);
- ASSERT(value_chunk_writers.size() == tablet.get_column_count());
+ const uint32_t total_rows = tablet.get_cur_row_size();
+ const bool strict_page_size = common::g_config_value_.strict_page_size_;
+
+ // Decide whether we have string/blob/text columns.
+ bool has_varlen_column = false;
+ for (uint32_t i = 0; i < data_types.size(); i++) {
+ if (data_types[i] == common::STRING || data_types[i] == common::TEXT ||
+ data_types[i] == common::BLOB) {
+ has_varlen_column = true;
+ break;
+ }
+ }
+
+ // Keep writers' seal-check behavior consistent across calls.
+ time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
- ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(
+ strict_page_size);
+ }
+ }
+
+ if (strict_page_size) {
+ // Strict mode: keep the original row-based insertion to ensure aligned
+ // pages seal together when either side becomes full.
+ for (uint32_t row = 0; row < total_rows; row++) {
+ int32_t time_pages_before = time_chunk_writer->num_of_pages();
+ std::vector<int32_t> value_pages_before(value_chunk_writers.size(),
+ 0);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer)) {
+ value_pages_before[c] = value_chunk_writer->num_of_pages();
+ }
+ }
+
+ if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
+ return ret;
+ }
+ ASSERT(value_chunk_writers.size() == tablet.get_column_count());
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (IS_NULL(value_chunk_writer)) {
+ continue;
+ }
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c,
+ row, row + 1))) {
+ return ret;
+ }
+ }
+ if (RET_FAIL(maybe_seal_aligned_pages_together(
+ time_chunk_writer, value_chunk_writers, time_pages_before,
+ value_pages_before))) {
+ return ret;
+ }
+ }
+ return ret;
+ }
+
+ // Non-strict mode: switch to column-based insertion.
+ if (!has_varlen_column) {
+ // Optimization: when there is no string/blob/text column, we only need
+ // to split by point-number so that each split will trigger a page
+ // seal (and avoid the per-row page-size check).
+ const uint32_t points_per_page =
+ common::g_config_value_.page_writer_max_point_num_;
+
+ // Disable auto page sealing. We will seal pages at split boundaries.
+ time_chunk_writer->set_enable_page_seal_if_full(false);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(false);
+ }
+ }
+
+ // Determine how many points we need to fill the current unsealed time
+ // page (it may already contain data from previous tablets).
+ uint32_t time_cur_points = time_chunk_writer->get_point_numer();
+ if (time_cur_points >= points_per_page &&
+ time_chunk_writer->has_current_page_data()) {
+ // Close the already-full page together with all aligned value
+ // pages.
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
+ if (!IS_NULL(value_chunk_writer) &&
+ value_chunk_writer->has_current_page_data()) {
+ if (RET_FAIL(value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ }
+ time_cur_points = 0;
+ }
+ const uint32_t first_seg_len =
+ (time_cur_points > 0 && time_cur_points < points_per_page)
+ ? (points_per_page - time_cur_points)
+ : points_per_page;
+
+ // 1) Write time in segments and seal all full segments (except the
+ // last remaining segment).
+ uint32_t seg_start = 0;
+ uint32_t seg_len = first_seg_len;
+ while (seg_start < total_rows) {
+ const uint32_t seg_end = std::min(seg_start + seg_len, total_rows);
+ if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
seg_start,
+ seg_end))) {
+ return ret;
+ }
+ seg_start = seg_end;
+ if (seg_start < total_rows) {
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ seg_len = points_per_page;
+ }
+
+ // 2) Write each value column in the same segments.
+ ASSERT(value_chunk_writers.size() == tablet.get_column_count());
+ for (uint32_t col = 0; col < value_chunk_writers.size(); col++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[col];
+ if (IS_NULL(value_chunk_writer)) {
+ continue;
+ }
+
+ seg_start = 0;
+ seg_len = first_seg_len;
+ while (seg_start < total_rows) {
+ const uint32_t seg_end =
+ std::min(seg_start + seg_len, total_rows);
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
col,
+ seg_start, seg_end))) {
+ return ret;
+ }
+ seg_start = seg_end;
+ if (seg_start < total_rows) {
+ if (value_chunk_writer->has_current_page_data() &&
+ RET_FAIL(value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ seg_len = points_per_page;
+ }
+ }
+ return ret;
+ }
+
+ // General non-strict (may have varlen STRING/TEXT/BLOB columns):
+ // time auto-seals to provide aligned page boundaries; value writers
+ // skip auto page sealing and are sealed manually at time boundaries.
+ // Attention: since value-side auto-seal is disabled, if a varlen value
+ // page hits the memory threshold earlier, it may not seal immediately
+ // and instead will be sealed later at the recorded time-page boundaries
+ // (this may sacrifice the strict page size limit for performance).
+ time_chunk_writer->set_enable_page_seal_if_full(true);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(false);
+ }
+ }
+
+ std::vector<uint32_t> time_page_row_ends;
+ const uint32_t page_max_points = std::max<uint32_t>(
+ 1, common::g_config_value_.page_writer_max_point_num_);
+ time_page_row_ends.reserve(total_rows / page_max_points + 1);
+
+ // Write time and record where a time page is sealed.
+ for (uint32_t row = 0; row < total_rows; row++) {
+ const int32_t pages_before = time_chunk_writer->num_of_pages();
+ if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
+ return ret;
+ }
+ const int32_t pages_after = time_chunk_writer->num_of_pages();
+ if (pages_after > pages_before) {
+ const uint32_t boundary_end = row + 1;
+ if (time_page_row_ends.empty() ||
+ time_page_row_ends.back() != boundary_end) {
+ time_page_row_ends.push_back(boundary_end);
+ }
+ }
+ }
+
+ // Write values column-by-column and seal at recorded boundaries.
+ ASSERT(value_chunk_writers.size() == tablet.get_column_count());
+ for (uint32_t col = 0; col < value_chunk_writers.size(); col++) {
+ ValueChunkWriter* value_chunk_writer = value_chunk_writers[col];
if (IS_NULL(value_chunk_writer)) {
continue;
}
- if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
- tablet.get_cur_row_size()))) {
- return ret;
+ uint32_t seg_start = 0;
+ for (uint32_t boundary_end : time_page_row_ends) {
+ if (boundary_end <= seg_start) {
+ continue;
+ }
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col,
+ seg_start, boundary_end))) {
+ return ret;
+ }
+ if (value_chunk_writer->has_current_page_data() &&
+ RET_FAIL(value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ seg_start = boundary_end;
+ }
+ if (seg_start < total_rows) {
+ if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col,
+ seg_start, total_rows))) {
+ return ret;
+ }
}
}
return ret;
@@ -896,26 +1150,242 @@ int TsFileWriter::write_table(Tablet& tablet) {
value_chunk_writers))) {
return ret;
}
- for (int i = start_idx; i < end_idx; i++) {
- if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i])))
{
- return ret;
+
+ const bool strict_page_size =
+ common::g_config_value_.strict_page_size_;
+
+ std::vector<uint32_t> field_columns;
+ field_columns.reserve(tablet.get_column_count());
+ for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
+ if (tablet.column_categories_[col] ==
+ common::ColumnCategory::FIELD) {
+ field_columns.push_back(col);
}
}
- uint32_t field_col_count = 0;
- for (uint32_t i = 0; i < tablet.get_column_count(); ++i) {
- if (tablet.column_categories_[i] ==
- common::ColumnCategory::FIELD) {
+ ASSERT(field_columns.size() == value_chunk_writers.size());
+
+ const bool has_varlen_field_column = [&]() {
+ for (uint32_t i = 0; i < field_columns.size(); i++) {
+ const common::TSDataType t =
+ tablet.schema_vec_->at(field_columns[i]).data_type_;
+ if (t == common::STRING || t == common::TEXT ||
+ t == common::BLOB) {
+ return true;
+ }
+ }
+ return false;
+ }();
+
+ // Keep writers' seal-check behavior consistent across calls.
+ time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(
+ strict_page_size);
+ }
+ }
+
+ if (strict_page_size) {
+ // Strict: row-based insertion and force aligned page sealing
+ // when either time or any value page becomes full.
+ for (int i = start_idx; i < end_idx; i++) {
+ int32_t time_pages_before =
+ time_chunk_writer->num_of_pages();
+ std::vector<int32_t> value_pages_before(
+ value_chunk_writers.size(), 0);
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+ if (!IS_NULL(value_chunk_writers[k])) {
+ value_pages_before[k] =
+ value_chunk_writers[k]->num_of_pages();
+ }
+ }
+
+ if (RET_FAIL(
+ time_chunk_writer->write(tablet.timestamps_[i]))) {
+ return ret;
+ }
+
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+ ValueChunkWriter* value_chunk_writer =
+ value_chunk_writers[k];
+ if (IS_NULL(value_chunk_writer)) {
+ continue;
+ }
+ const uint32_t tablet_col_idx = field_columns[k];
+ if (RET_FAIL(value_write_column(value_chunk_writer,
+ tablet, tablet_col_idx,
+ i, i + 1))) {
+ return ret;
+ }
+ }
+
+ if (RET_FAIL(maybe_seal_aligned_pages_together(
+ time_chunk_writer, value_chunk_writers,
+ time_pages_before, value_pages_before))) {
+ return ret;
+ }
+ }
+ } else if (!has_varlen_field_column) {
+ // Optimization: no string/blob/text columns, so we can
+ // segment by point-number and seal pages at those boundaries
+ // in column-based order.
+ const uint32_t points_per_page =
+ common::g_config_value_.page_writer_max_point_num_;
+
+ time_chunk_writer->set_enable_page_seal_if_full(false);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(
+ false);
+ }
+ }
+
+ // Fill the already-unsealed time page first.
+ uint32_t time_cur_points =
time_chunk_writer->get_point_numer();
+ if (time_cur_points >= points_per_page &&
+ time_chunk_writer->has_current_page_data()) {
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c]) &&
+ value_chunk_writers[c]->has_current_page_data()) {
+ if (RET_FAIL(value_chunk_writers[c]
+ ->seal_current_page())) {
+ return ret;
+ }
+ }
+ }
+ time_cur_points = 0;
+ }
+
+ const uint32_t first_seg_len =
+ (time_cur_points > 0 && time_cur_points < points_per_page)
+ ? (points_per_page - time_cur_points)
+ : points_per_page;
+
+ // 1) Write time in segments (seal all full segments).
+ uint32_t seg_start = static_cast<uint32_t>(start_idx);
+ uint32_t seg_len = first_seg_len;
+ while (static_cast<int>(seg_start) < end_idx) {
+ const uint32_t seg_end = std::min(
+ seg_start + seg_len, static_cast<uint32_t>(end_idx));
+ if (RET_FAIL(time_write_column(time_chunk_writer, tablet,
+ seg_start, seg_end))) {
+ return ret;
+ }
+ seg_start = seg_end;
+ if (static_cast<int>(seg_start) < end_idx) {
+ if (RET_FAIL(time_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ seg_len = points_per_page;
+ }
+
+ // 2) Write each value column (same segments).
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
ValueChunkWriter* value_chunk_writer =
- value_chunk_writers[field_col_count];
+ value_chunk_writers[k];
if (IS_NULL(value_chunk_writer)) {
continue;
}
+ seg_start = static_cast<uint32_t>(start_idx);
+ seg_len = first_seg_len;
+ while (static_cast<int>(seg_start) < end_idx) {
+ const uint32_t seg_end =
+ std::min(seg_start + seg_len,
+ static_cast<uint32_t>(end_idx));
+ if (RET_FAIL(value_write_column(
+ value_chunk_writer, tablet, field_columns[k],
+ seg_start, seg_end))) {
+ return ret;
+ }
+ seg_start = seg_end;
+ if (static_cast<int>(seg_start) < end_idx) {
+ if (value_chunk_writer->has_current_page_data() &&
+ RET_FAIL(
+ value_chunk_writer->seal_current_page())) {
+ return ret;
+ }
+ }
+ seg_len = points_per_page;
+ }
+ }
+ } else {
+ // General non-strict (may have varlen STRING/TEXT/BLOB
+ // columns): time auto-seals to provide aligned page
boundaries;
+ // value writers skip auto page sealing and are sealed manually
+ // at recorded time-page boundaries. Attention: since
value-side
+ // auto-seal is disabled, if a varlen value page hits the
memory
+ // threshold earlier, it may not seal immediately and will be
+ // sealed later at the time-page boundaries (non-strict
+ // sacrifices the strict page size/memory limit for
+ // performance).
+ time_chunk_writer->set_enable_page_seal_if_full(true);
+ for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
+ if (!IS_NULL(value_chunk_writers[c])) {
+ value_chunk_writers[c]->set_enable_page_seal_if_full(
+ false);
+ }
+ }
- if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
- i, start_idx, end_idx))) {
+ std::vector<uint32_t> time_page_row_ends;
+ const uint32_t page_max_points = std::max<uint32_t>(
+ 1, common::g_config_value_.page_writer_max_point_num_);
+ const uint32_t batch_rows =
+ static_cast<uint32_t>(end_idx - start_idx);
+ time_page_row_ends.reserve(batch_rows / page_max_points + 1);
+ for (uint32_t r = static_cast<uint32_t>(start_idx);
+ r < static_cast<uint32_t>(end_idx); r++) {
+ const int32_t pages_before =
+ time_chunk_writer->num_of_pages();
+ if (RET_FAIL(
+ time_chunk_writer->write(tablet.timestamps_[r]))) {
return ret;
}
- field_col_count++;
+ const int32_t pages_after =
+ time_chunk_writer->num_of_pages();
+ if (pages_after > pages_before) {
+ const uint32_t boundary_end = r + 1;
+ if (time_page_row_ends.empty() ||
+ time_page_row_ends.back() != boundary_end) {
+ time_page_row_ends.push_back(boundary_end);
+ }
+ }
+ }
+
+ // Write values column-by-column and seal at recorded time
+ // boundaries.
+ for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
+ ValueChunkWriter* value_chunk_writer =
+ value_chunk_writers[k];
+ if (IS_NULL(value_chunk_writer)) {
+ continue;
+ }
+ uint32_t seg_start = static_cast<uint32_t>(start_idx);
+ for (uint32_t boundary_end : time_page_row_ends) {
+ if (boundary_end <= seg_start) {
+ continue;
+ }
+ if (RET_FAIL(value_write_column(
+ value_chunk_writer, tablet, field_columns[k],
+ seg_start, boundary_end))) {
+ return ret;
+ }
+ if (value_chunk_writer->has_current_page_data() &&
+ RET_FAIL(value_chunk_writer->seal_current_page()))
{
+ return ret;
+ }
+ seg_start = boundary_end;
+ }
+ if (seg_start < static_cast<uint32_t>(end_idx)) {
+ if (RET_FAIL(value_write_column(
+ value_chunk_writer, tablet, field_columns[k],
+ seg_start, static_cast<uint32_t>(end_idx)))) {
+ return ret;
+ }
+ }
}
}
start_idx = end_idx;
diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h
index ec8fe3f44..ff7cdbac2 100644
--- a/cpp/src/writer/tsfile_writer.h
+++ b/cpp/src/writer/tsfile_writer.h
@@ -46,6 +46,7 @@ extern int libtsfile_init();
extern void libtsfile_destroy();
extern void set_page_max_point_count(uint32_t page_max_ponint_count);
extern void set_max_degree_of_index_node(uint32_t max_degree_of_index_node);
+extern void set_strict_page_size(bool strict_page_size);
class TsFileWriter {
public:
@@ -116,6 +117,11 @@ class TsFileWriter {
int write_point_aligned(ValueChunkWriter* value_chunk_writer,
int64_t timestamp, common::TSDataType data_type,
const DataPoint& point);
+ int maybe_seal_aligned_pages_together(
+ TimeChunkWriter* time_chunk_writer,
+ common::SimpleVector<ValueChunkWriter*>& value_chunk_writers,
+ int32_t time_pages_before,
+ const std::vector<int32_t>& value_pages_before);
int flush_chunk_group(MeasurementSchemaGroup* chunk_group, bool
is_aligned);
int write_typed_column(storage::ChunkWriter* chunk_writer,
diff --git a/cpp/src/writer/value_chunk_writer.cc
b/cpp/src/writer/value_chunk_writer.cc
index e4bb52658..a59cf8d3f 100644
--- a/cpp/src/writer/value_chunk_writer.cc
+++ b/cpp/src/writer/value_chunk_writer.cc
@@ -110,7 +110,7 @@ int ValueChunkWriter::seal_cur_page(bool end_chunk) {
/*stat*/ false, /*data*/ false);
if (IS_SUCC(ret)) {
save_first_page_data(value_page_writer_);
- // value_page_writer_.destroy_page_data();
+ value_page_writer_.clear_page_data();
value_page_writer_.reset();
}
}
@@ -161,7 +161,8 @@ int ValueChunkWriter::write_first_page_data(ByteStream&
pages_data,
int ValueChunkWriter::end_encode_chunk() {
int ret = E_OK;
- if (value_page_writer_.get_statistic()->count_ > 0) {
+ if (value_page_writer_.get_point_numer() > 0 ||
+ (has_current_page_data() && num_of_pages_ == 0)) {
ret = seal_cur_page(/*end_chunk*/ true);
if (E_OK == ret) {
chunk_header_.data_size_ = chunk_data_.total_size();
@@ -174,6 +175,9 @@ int ValueChunkWriter::end_encode_chunk() {
chunk_header_.data_size_ = chunk_data_.total_size();
chunk_header_.num_of_pages_ = num_of_pages_;
}
+ } else if (num_of_pages_ > 0) {
+ chunk_header_.data_size_ = chunk_data_.total_size();
+ chunk_header_.num_of_pages_ = num_of_pages_;
}
#if DEBUG_SE
std::cout << "end_encode_chunk: num_of_pages_=" << num_of_pages_
@@ -193,9 +197,7 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() {
}
bool ValueChunkWriter::hasData() {
- return num_of_pages_ > 0 ||
- (value_page_writer_.get_statistic() != nullptr &&
- value_page_writer_.get_statistic()->count_ > 0);
+ return num_of_pages_ > 0 || has_current_page_data();
}
} // end namespace storage
diff --git a/cpp/src/writer/value_chunk_writer.h
b/cpp/src/writer/value_chunk_writer.h
index 4391b7540..64eb4cc50 100644
--- a/cpp/src/writer/value_chunk_writer.h
+++ b/cpp/src/writer/value_chunk_writer.h
@@ -53,7 +53,8 @@ class ValueChunkWriter {
first_page_data_(),
first_page_statistic_(nullptr),
chunk_header_(),
- num_of_pages_(0) {}
+ num_of_pages_(0),
+ enable_page_seal_if_full_(true) {}
~ValueChunkWriter() { destroy(); }
int init(const common::ColumnSchema& col_schema);
int init(const std::string& measurement_name, common::TSDataType data_type,
@@ -118,6 +119,29 @@ class ValueChunkWriter {
bool hasData();
+ /** True if the current (unsealed) page has at least one write (including
+ * nulls). */
+ bool has_current_page_data() const {
+ return value_page_writer_.get_total_write_count() > 0;
+ }
+
+ FORCE_INLINE uint32_t get_point_numer() const {
+ return value_page_writer_.get_point_numer();
+ }
+
+ /**
+ * Force seal the current page (for aligned table model: when time page
+ * seals due to memory/point threshold, all value pages must seal
together).
+ * @return E_OK on success.
+ */
+ int seal_current_page() { return seal_cur_page(false); }
+
+ // For aligned writer: allow disabling the automatic page-size/point-number
+ // check so the caller can seal pages at chosen boundaries.
+ FORCE_INLINE void set_enable_page_seal_if_full(bool enable) {
+ enable_page_seal_if_full_ = enable;
+ }
+
private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
@@ -127,6 +151,9 @@ class ValueChunkWriter {
common::g_config_value_.page_writer_max_memory_bytes_);
}
FORCE_INLINE int seal_cur_page_if_full() {
+ if (UNLIKELY(!enable_page_seal_if_full_)) {
+ return common::E_OK;
+ }
if (UNLIKELY(is_cur_page_full())) {
return seal_cur_page(false);
}
@@ -156,6 +183,8 @@ class ValueChunkWriter {
ChunkHeader chunk_header_;
int32_t num_of_pages_;
+ // If false, write() won't auto-seal when the current page becomes full.
+ bool enable_page_seal_if_full_;
};
} // end namespace storage
diff --git a/cpp/src/writer/value_page_writer.cc
b/cpp/src/writer/value_page_writer.cc
index feedb1870..1c8f05350 100644
--- a/cpp/src/writer/value_page_writer.cc
+++ b/cpp/src/writer/value_page_writer.cc
@@ -43,7 +43,7 @@ int ValuePageData::init(ByteStream& col_notnull_bitmap_bs,
ByteStream& value_bs,
if (IS_NULL(uncompressed_buf_)) {
return E_OOM;
}
- if (col_notnull_bitmap_buf_size_ == 0 || value_buf_size_ == 0) {
+ if (col_notnull_bitmap_buf_size_ == 0) {
return E_INVALID_ARG;
}
uncompressed_buf_[0] = (unsigned char)((size >> 24) & 0xFF);
@@ -54,11 +54,11 @@ int ValuePageData::init(ByteStream& col_notnull_bitmap_bs,
ByteStream& value_bs,
if (RET_FAIL(common::copy_bs_to_buf(col_notnull_bitmap_bs,
uncompressed_buf_ + sizeof(size),
col_notnull_bitmap_buf_size_))) {
- } else if (RET_FAIL(common::copy_bs_to_buf(value_bs,
- uncompressed_buf_ +
- sizeof(size) +
-
col_notnull_bitmap_buf_size_,
- value_buf_size_))) {
+ } else if (value_buf_size_ > 0 && RET_FAIL(common::copy_bs_to_buf(
+ value_bs,
+ uncompressed_buf_ + sizeof(size) +
+ col_notnull_bitmap_buf_size_,
+ value_buf_size_))) {
} else {
// TODO
// NOTE: different compressor may have different compress API
diff --git a/cpp/src/writer/value_page_writer.h
b/cpp/src/writer/value_page_writer.h
index ec115c9da..ef694693b 100644
--- a/cpp/src/writer/value_page_writer.h
+++ b/cpp/src/writer/value_page_writer.h
@@ -51,7 +51,6 @@ struct ValuePageData {
common::ByteStream& value_bs, Compressor* compressor,
uint32_t size);
void destroy() {
- // Be careful about the memory
if (uncompressed_buf_ != nullptr) {
common::mem_free(uncompressed_buf_);
uncompressed_buf_ = nullptr;
@@ -60,6 +59,19 @@ struct ValuePageData {
compressor_->after_compress(compressed_buf_);
compressed_buf_ = nullptr;
}
+ compressor_ = nullptr;
+ }
+
+ /** Clear pointers without freeing (transfer ownership to another holder).
+ */
+ void clear() {
+ col_notnull_bitmap_buf_size_ = 0;
+ value_buf_size_ = 0;
+ uncompressed_size_ = 0;
+ compressed_size_ = 0;
+ uncompressed_buf_ = nullptr;
+ compressed_buf_ = nullptr;
+ compressor_ = nullptr;
}
};
@@ -152,6 +164,7 @@ class ValuePageWriter {
}
FORCE_INLINE uint32_t get_point_numer() const { return statistic_->count_;
}
+ FORCE_INLINE uint32_t get_total_write_count() const { return size_; }
FORCE_INLINE uint32_t get_col_notnull_bitmap_out_stream_size() const {
return col_notnull_bitmap_out_stream_.total_size();
}
@@ -183,6 +196,8 @@ class ValuePageWriter {
FORCE_INLINE Statistic* get_statistic() { return statistic_; }
ValuePageData get_cur_page_data() { return cur_page_data_; }
void destroy_page_data() { cur_page_data_.destroy(); }
+ /** Clear cur_page_data_ without freeing (after ownership transferred). */
+ void clear_page_data() { cur_page_data_.clear(); }
private:
FORCE_INLINE int prepare_end_page() {
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index b9f0eb213..4b1a8259f 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -216,6 +216,21 @@ TEST_F(TsFileTableReaderTest, TableModelQueryOneSmallPage)
{
g_config_value_.page_writer_max_point_num_ = prev_config;
}
+// Triggers memory-based seal in aligned table: time page seals by size while
+// value pages may not; ensure value pages are sealed together with time (no
+// time-page-sealed / value-page-not-sealed inconsistency).
+// Use 512 bytes so time seals by size before point count; 128 was too small
+// and could produce misaligned time/value pages on some encodings.
+TEST_F(TsFileTableReaderTest, TableModelQueryMemoryBasedSeal) {
+ uint32_t prev_point_num = g_config_value_.page_writer_max_point_num_;
+ uint32_t prev_mem_bytes = g_config_value_.page_writer_max_memory_bytes_;
+ g_config_value_.page_writer_max_point_num_ = 10000;
+ g_config_value_.page_writer_max_memory_bytes_ = 512;
+ test_table_model_query(50, 1);
+ g_config_value_.page_writer_max_point_num_ = prev_point_num;
+ g_config_value_.page_writer_max_memory_bytes_ = prev_mem_bytes;
+}
+
TEST_F(TsFileTableReaderTest, TableModelQueryOneLargePage) {
int prev_config = g_config_value_.page_writer_max_point_num_;
g_config_value_.page_writer_max_point_num_ = 10000;
diff --git a/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
b/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
index 13a0257d3..8003326e9 100644
--- a/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
+++ b/cpp/test/reader/table_view/tsfile_table_query_by_row_test.cc
@@ -651,7 +651,7 @@ TEST_F(TableQueryByRowTest,
DenseSingleDeviceSsiLevelPushdown) {
// Pushdown is faster than full query + manual next: queryByRow(offset, limit)
// skips at device/SSI/Chunk level; old query then manual next decodes every
-// row. Timing tolerance 5% to allow measurement noise.
+// row. Timing tolerance 20% to allow measurement noise.
TEST_F(TableQueryByRowTest, QueryByRowFasterThanManualNext) {
const int num_rows = 8000;
const int offset = 3000;
@@ -659,7 +659,7 @@ TEST_F(TableQueryByRowTest, QueryByRowFasterThanManualNext)
{
write_single_device_file(num_rows);
const int num_iters = 5;
- const double tolerance = 0.1; // 10% tolerance to allow for timing noise
+ const double tolerance = 0.2;
auto run_query_by_row = [this, offset, limit]() {
TsFileReader reader;
diff --git a/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
b/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
index 56f8c113a..1643303df 100644
--- a/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
+++ b/cpp/test/reader/tree_view/tsfile_tree_query_by_row_test.cc
@@ -1102,7 +1102,7 @@ TEST_F(TreeQueryByRowTest,
MultiPath_TimeHint_SkipsStaleChunk_WithOffset) {
// Pushdown is faster than full query + manual next: queryByRow(offset, limit)
// skips at Chunk/Page level; old query then manual next decodes every row.
-// Timing tolerance 5% to allow measurement noise.
+// Timing tolerance 20% to allow measurement noise.
TEST_F(TreeQueryByRowTest, QueryByRowFasterThanManualNext) {
std::vector<std::string> devices = {"d1"};
std::vector<std::string> measurements = {"s1"};
@@ -1112,7 +1112,7 @@ TEST_F(TreeQueryByRowTest,
QueryByRowFasterThanManualNext) {
write_test_file(devices, measurements, num_rows);
const int num_iters = 5;
- const double tolerance = 0.05;
+ const double tolerance = 0.2;
auto run_query_by_row = [this, &devices, &measurements, offset, limit]() {
TsFileTreeReader reader;
diff --git a/cpp/test/writer/tsfile_writer_test.cc
b/cpp/test/writer/tsfile_writer_test.cc
index 30fded6eb..285d926b1 100644
--- a/cpp/test/writer/tsfile_writer_test.cc
+++ b/cpp/test/writer/tsfile_writer_test.cc
@@ -808,6 +808,241 @@ TEST_F(TsFileWriterTest, WriteAlignedTimeseries) {
reader.destroy_query_data_set(qds);
}
+/*
+ * Aligned page seal synchronization tests.
+ *
+ * In the aligned model, time page and every value page must seal together
+ * so that each chunk has the same number of pages. Without synchronization,
+ * a threshold hit on one page (point-count or memory) would seal only that
+ * page, producing misaligned page counts and corrupt reads.
+ *
+ * Three sub-cases:
+ * 1. Time page reaches point-count threshold first; value pages have
+ * partial nulls so their non-null statistic count is lower and they
+ * would NOT seal on their own.
+ * 2. Time page reaches memory threshold first; value pages are mostly
+ * null so their encoded-data memory is much smaller.
+ * 3. A value page (STRING, large per-row memory) reaches memory
+ * threshold first; time page and other value pages have not.
+ */
+
+// Case 1: time page seals by point-count; value pages with partial nulls
+// have fewer non-null points (statistic count) and would not self-seal.
+// Sync mechanism must force all value pages to seal together.
+TEST_F(TsFileWriterTest, AlignedSealSync_PointCountWithNulls) {
+ uint32_t prev_pt = g_config_value_.page_writer_max_point_num_;
+ uint32_t prev_mem = g_config_value_.page_writer_max_memory_bytes_;
+ struct Guard {
+ uint32_t pt, mem;
+ ~Guard() {
+ g_config_value_.page_writer_max_point_num_ = pt;
+ g_config_value_.page_writer_max_memory_bytes_ = mem;
+ }
+ } guard{prev_pt, prev_mem};
+ g_config_value_.page_writer_max_point_num_ = 10;
+ g_config_value_.page_writer_max_memory_bytes_ = 1024 * 1024;
+
+ std::string device_name = "device_pt_null";
+ std::vector<std::string> mnames = {"s0", "s1", "s2"};
+ std::vector<MeasurementSchema*> schemas;
+ for (auto& n : mnames) {
+ schemas.push_back(new MeasurementSchema(n, INT64, PLAIN,
UNCOMPRESSED));
+ }
+ tsfile_writer_->register_aligned_timeseries(device_name, schemas);
+
+ // s0: always non-null -> 10 non-null per 10-row page, self-seals
+ // s1: null on even rows -> 5 non-null per page, won't self-seal
+ // s2: null except every 5th row -> 2 non-null per page, won't self-seal
+ int row_num = 30;
+ for (int i = 0; i < row_num; ++i) {
+ TsRecord record(1622505600000 + i, device_name);
+ record.add_point(mnames[0], static_cast<int64_t>(i));
+ if (i % 2 != 0) {
+ record.add_point(mnames[1], static_cast<int64_t>(i * 10));
+ } else {
+ record.points_.emplace_back(DataPoint(mnames[1]));
+ }
+ if (i % 5 == 0) {
+ record.add_point(mnames[2], static_cast<int64_t>(i * 100));
+ } else {
+ record.points_.emplace_back(DataPoint(mnames[2]));
+ }
+ ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK);
+ }
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
+ std::vector<storage::Path> select_list;
+ for (auto& n : mnames) {
+ select_list.emplace_back(device_name, n);
+ }
+ storage::QueryExpression* qe =
+ storage::QueryExpression::create(select_list, nullptr);
+ storage::TsFileReader reader;
+ ASSERT_EQ(reader.open(file_name_), E_OK);
+ storage::ResultSet* tmp_qds = nullptr;
+ ASSERT_EQ(reader.query(qe, tmp_qds), E_OK);
+ auto* qds = (QDSWithoutTimeGenerator*)tmp_qds;
+
+ bool has_next = false;
+ int64_t cur_row = 0;
+ while (IS_SUCC(qds->next(has_next)) && has_next) {
+ auto* rec = qds->get_row_record();
+ ASSERT_NE(rec, nullptr);
+ EXPECT_EQ(rec->get_timestamp(), 1622505600000 + cur_row);
+ EXPECT_EQ(field_to_string(rec->get_field(1)), std::to_string(cur_row));
+ if (cur_row % 2 != 0) {
+ EXPECT_EQ(field_to_string(rec->get_field(2)),
+ std::to_string(cur_row * 10));
+ }
+ if (cur_row % 5 == 0) {
+ EXPECT_EQ(field_to_string(rec->get_field(3)),
+ std::to_string(cur_row * 100));
+ }
+ cur_row++;
+ }
+ EXPECT_EQ(cur_row, row_num);
+ reader.destroy_query_data_set(qds);
+ ASSERT_EQ(reader.close(), E_OK);
+}
+
+// Case 2: time page seals by memory threshold first. Value pages are mostly
+// null so their encoded-value memory grows much slower than the time page
+// (INT64 PLAIN = 8 bytes/point). Time page hits 512 bytes at ~64 points;
+// value pages with 1 non-null every 20 rows only have ~24 bytes of value
+// data at that point. Sync must force all value pages to seal.
+TEST_F(TsFileWriterTest, AlignedSealSync_TimeMemoryFirst) {
+ uint32_t prev_pt = g_config_value_.page_writer_max_point_num_;
+ uint32_t prev_mem = g_config_value_.page_writer_max_memory_bytes_;
+ struct Guard {
+ uint32_t pt, mem;
+ ~Guard() {
+ g_config_value_.page_writer_max_point_num_ = pt;
+ g_config_value_.page_writer_max_memory_bytes_ = mem;
+ }
+ } guard{prev_pt, prev_mem};
+ g_config_value_.page_writer_max_point_num_ = 10000;
+ g_config_value_.page_writer_max_memory_bytes_ = 512;
+
+ std::string device_name = "device_time_mem";
+ std::vector<std::string> mnames = {"s0", "s1"};
+ std::vector<MeasurementSchema*> schemas;
+ for (auto& n : mnames) {
+ schemas.push_back(new MeasurementSchema(n, INT64, PLAIN,
UNCOMPRESSED));
+ }
+ tsfile_writer_->register_aligned_timeseries(device_name, schemas);
+
+ int row_num = 200;
+ for (int i = 0; i < row_num; ++i) {
+ TsRecord record(1622505600000 + i, device_name);
+ if (i % 20 == 0) {
+ record.add_point(mnames[0], static_cast<int64_t>(i));
+ record.add_point(mnames[1], static_cast<int64_t>(i * 10));
+ } else {
+ record.points_.emplace_back(DataPoint(mnames[0]));
+ record.points_.emplace_back(DataPoint(mnames[1]));
+ }
+ ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK);
+ }
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
+ std::vector<storage::Path> select_list;
+ for (auto& n : mnames) {
+ select_list.emplace_back(device_name, n);
+ }
+ storage::QueryExpression* qe =
+ storage::QueryExpression::create(select_list, nullptr);
+ storage::TsFileReader reader;
+ ASSERT_EQ(reader.open(file_name_), E_OK);
+ storage::ResultSet* tmp_qds = nullptr;
+ ASSERT_EQ(reader.query(qe, tmp_qds), E_OK);
+ auto* qds = (QDSWithoutTimeGenerator*)tmp_qds;
+
+ bool has_next = false;
+ int64_t cur_row = 0;
+ while (IS_SUCC(qds->next(has_next)) && has_next) {
+ auto* rec = qds->get_row_record();
+ ASSERT_NE(rec, nullptr);
+ EXPECT_EQ(rec->get_timestamp(), 1622505600000 + cur_row);
+ if (cur_row % 20 == 0) {
+ EXPECT_EQ(field_to_string(rec->get_field(1)),
+ std::to_string(cur_row));
+ EXPECT_EQ(field_to_string(rec->get_field(2)),
+ std::to_string(cur_row * 10));
+ }
+ cur_row++;
+ }
+ EXPECT_EQ(cur_row, row_num);
+ reader.destroy_query_data_set(qds);
+ ASSERT_EQ(reader.close(), E_OK);
+}
+
+// Case 3: a value page (STRING type, ~104 bytes/point with PLAIN encoding)
+// seals by memory threshold before the time page (INT64, 8 bytes/point).
+// With threshold=512, STRING value page seals at ~5 points while time page
+// only has ~40 bytes. Sync must force time page and other value pages to seal.
+TEST_F(TsFileWriterTest, AlignedSealSync_ValueMemoryFirst) {
+ uint32_t prev_pt = g_config_value_.page_writer_max_point_num_;
+ uint32_t prev_mem = g_config_value_.page_writer_max_memory_bytes_;
+ struct Guard {
+ uint32_t pt, mem;
+ ~Guard() {
+ g_config_value_.page_writer_max_point_num_ = pt;
+ g_config_value_.page_writer_max_memory_bytes_ = mem;
+ }
+ } guard{prev_pt, prev_mem};
+ g_config_value_.page_writer_max_point_num_ = 10000;
+ g_config_value_.page_writer_max_memory_bytes_ = 512;
+
+ std::string device_name = "device_val_mem";
+ std::vector<MeasurementSchema*> schemas;
+ schemas.push_back(new MeasurementSchema("s0", INT64, PLAIN, UNCOMPRESSED));
+ schemas.push_back(new MeasurementSchema("s1", STRING, PLAIN,
UNCOMPRESSED));
+ tsfile_writer_->register_aligned_timeseries(device_name, schemas);
+
+ char* long_buf = new char[101];
+ memset(long_buf, 'A', 100);
+ long_buf[100] = '\0';
+ common::String str_val(long_buf, 100);
+
+ int row_num = 100;
+ for (int i = 0; i < row_num; ++i) {
+ TsRecord record(1622505600000 + i, device_name);
+ record.add_point(std::string("s0"), static_cast<int64_t>(i));
+ record.add_point(std::string("s1"), str_val);
+ ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK);
+ }
+ delete[] long_buf;
+ ASSERT_EQ(tsfile_writer_->flush(), E_OK);
+ ASSERT_EQ(tsfile_writer_->close(), E_OK);
+
+ std::string s0("s0"), s1("s1");
+ std::vector<storage::Path> select_list;
+ select_list.emplace_back(device_name, s0);
+ select_list.emplace_back(device_name, s1);
+ storage::QueryExpression* qe =
+ storage::QueryExpression::create(select_list, nullptr);
+ storage::TsFileReader reader;
+ ASSERT_EQ(reader.open(file_name_), E_OK);
+ storage::ResultSet* tmp_qds = nullptr;
+ ASSERT_EQ(reader.query(qe, tmp_qds), E_OK);
+ auto* qds = (QDSWithoutTimeGenerator*)tmp_qds;
+
+ bool has_next = false;
+ int64_t cur_row = 0;
+ while (IS_SUCC(qds->next(has_next)) && has_next) {
+ auto* rec = qds->get_row_record();
+ ASSERT_NE(rec, nullptr);
+ EXPECT_EQ(rec->get_timestamp(), 1622505600000 + cur_row);
+ EXPECT_EQ(field_to_string(rec->get_field(1)), std::to_string(cur_row));
+ cur_row++;
+ }
+ EXPECT_EQ(cur_row, row_num);
+ reader.destroy_query_data_set(qds);
+ ASSERT_EQ(reader.close(), E_OK);
+}
+
TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) {
int measurement_num = 100, row_num = 100;
std::string device_name = "device";