This is an automated email from the ASF dual-hosted git repository.
colinlee pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new e2fba469 Fix data loss (#771)
e2fba469 is described below
commit e2fba469368d0847af7292b5d40a0d1e910ff07e
Author: Colin Lee <[email protected]>
AuthorDate: Thu Apr 9 18:03:26 2026 +0800
Fix data loss (#771)
* fix data loss.
* fix format.
---
cpp/src/reader/aligned_chunk_reader.cc | 18 ++---
.../reader/table_view/tsfile_reader_table_test.cc | 83 ++++++++++++++++++++++
2 files changed, 93 insertions(+), 8 deletions(-)
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 2d117b1c..955715d4 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -550,12 +550,13 @@ int
AlignedChunkReader::decode_time_value_buf_into_tsblock(
((value_page_col_notnull_bitmap_[cur_value_index / 8] &
\
0xFF) &
\
(mask >> (cur_value_index % 8))) == 0) {
\
- ret = time_decoder_->read_int64(time, time_in);
\
- if (ret != E_OK) {
\
- break;
\
- }
\
if (UNLIKELY(!row_appender.add_row())) {
\
ret = E_OVERFLOW;
\
+ cur_value_index--;
\
+ break;
\
+ }
\
+ ret = time_decoder_->read_int64(time, time_in);
\
+ if (ret != E_OK) {
\
break;
\
}
\
row_appender.append(0, (char*)&time, sizeof(time));
\
@@ -596,12 +597,13 @@ int AlignedChunkReader::i32_DECODE_TYPED_TV_INTO_TSBLOCK(
if (value_page_col_notnull_bitmap_.empty() ||
((value_page_col_notnull_bitmap_[cur_value_index / 8] & 0xFF) &
(mask >> (cur_value_index % 8))) == 0) {
- ret = time_decoder_->read_int64(time, time_in);
- if (ret != E_OK) {
- break;
- }
if (UNLIKELY(!row_appender.add_row())) {
ret = E_OVERFLOW;
+ cur_value_index--;
+ break;
+ }
+ ret = time_decoder_->read_int64(time, time_in);
+ if (ret != E_OK) {
break;
}
row_appender.append(0, (char*)&time, sizeof(time));
diff --git a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
index 4b1a8259..1f63573e 100644
--- a/cpp/test/reader/table_view/tsfile_reader_table_test.cc
+++ b/cpp/test/reader/table_view/tsfile_reader_table_test.cc
@@ -803,3 +803,86 @@ TEST_F(TsFileTableReaderTest, TestTimeColumnReader) {
reader.destroy_query_data_set(table_result_set);
ASSERT_EQ(reader.close(), common::E_OK);
}
+
+// Regression test: AlignedChunkReader NULL branch overflow drops rows.
+// When a TsBlock is full (block_size=1024) and the next row to decode is a
+// NULL value in aligned data, the old code consumed the timestamp before
+// checking add_row(), silently losing that row on E_OVERFLOW.
+TEST_F(TsFileTableReaderTest, AlignedNullAtBlockBoundaryNoRowLoss) {
+ // block_size in RETURN_ROW mode is 1024.
+ const int32_t block_size = 1024;
+ // Write enough rows so that overflow happens multiple times,
+ // and place NULLs exactly at every block boundary.
+ const int32_t total_rows = block_size * 4; // 4096 rows
+
+ std::string table_name = "null_boundary";
+ auto* schema = new storage::TableSchema(
+ table_name,
+ {
+ common::ColumnSchema("tag1", common::TSDataType::STRING,
+ common::ColumnCategory::TAG),
+ // s_nullable: NULL at every block_size boundary
+ common::ColumnSchema("s_nullable", common::TSDataType::INT64,
+ common::ColumnCategory::FIELD),
+ // s_full: always has a value (control group)
+ common::ColumnSchema("s_full", common::TSDataType::INT64,
+ common::ColumnCategory::FIELD),
+ });
+
+ auto* writer =
+ new storage::TsFileTableWriter(&write_file_, schema, 128 * 1024 *
1024);
+
+ storage::Tablet tablet(
+ {"tag1", "s_nullable", "s_full"},
+ {common::TSDataType::STRING, common::TSDataType::INT64,
+ common::TSDataType::INT64},
+ total_rows);
+
+ for (int32_t i = 0; i < total_rows; i++) {
+ tablet.add_timestamp(i, static_cast<int64_t>(i));
+ tablet.add_value(i, "tag1", "device0");
+ tablet.add_value(i, "s_full", static_cast<int64_t>(i));
+ // Make row at every block_size boundary NULL for s_nullable.
+ // These are exactly the rows that trigger E_OVERFLOW in the decoder.
+ if (i % block_size != 0) {
+ tablet.add_value(i, "s_nullable", static_cast<int64_t>(i));
+ }
+ // else: s_nullable is NULL at i=0, 1024, 2048, 3072
+ }
+
+ ASSERT_EQ(writer->write_table(tablet), common::E_OK);
+ ASSERT_EQ(writer->flush(), common::E_OK);
+ ASSERT_EQ(writer->close(), common::E_OK);
+ delete writer;
+ delete schema;
+
+ storage::TsFileReader reader;
+ ASSERT_EQ(reader.open(file_name_), common::E_OK);
+
+ // Helper: query a single column and count rows.
+ auto count_rows = [&](const std::string& col) -> int64_t {
+ storage::ResultSet* rs = nullptr;
+ int ret = reader.query(table_name, {col}, 0, INT64_MAX, rs);
+ EXPECT_EQ(ret, common::E_OK);
+ if (rs == nullptr) return -1;
+ auto* trs = dynamic_cast<storage::TableResultSet*>(rs);
+ bool hn = false;
+ int64_t cnt = 0;
+ while (trs->next(hn) == common::E_OK && hn) {
+ cnt++;
+ }
+ reader.destroy_query_data_set(rs);
+ return cnt;
+ };
+
+ int64_t full_rows = count_rows("s_full");
+ int64_t nullable_rows = count_rows("s_nullable");
+
+ // Both columns must return the same number of rows.
+ // Before the fix, s_nullable would lose one row per overflow at a NULL
+ // boundary, yielding fewer rows than s_full.
+ ASSERT_EQ(full_rows, total_rows);
+ ASSERT_EQ(nullable_rows, total_rows);
+
+ ASSERT_EQ(reader.close(), common::E_OK);
+}
\ No newline at end of file