This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/tsfile.git
The following commit(s) were added to refs/heads/develop by this push:
new d2a906ec Fixes the issue where data with only timestamps could not be
inserted (#525)
d2a906ec is described below
commit d2a906ec88f8fe557bf0f6cba742e03348992d1d
Author: Colin Lee <[email protected]>
AuthorDate: Fri Jun 27 17:01:09 2025 +0800
Fixes the issue where data with only timestamps could not be inserted (#525)
* Fixes the issue where data with only timestamps could not be inserted or
queried.
* fix issue.
* fix fmt.
---
cpp/src/common/tsblock/tsblock.h | 14 ++-
cpp/src/common/tsblock/tuple_desc.h | 5 +
cpp/src/reader/aligned_chunk_reader.cc | 21 ++--
.../reader/block/single_device_tsblock_reader.cc | 34 +++++--
cpp/src/reader/table_result_set.cc | 3 +-
cpp/src/writer/time_chunk_writer.cc | 5 +
cpp/src/writer/time_chunk_writer.h | 4 +
cpp/src/writer/tsfile_writer.cc | 8 +-
.../writer/table_view/tsfile_writer_table_test.cc | 109 ++++++++++++++++++---
9 files changed, 166 insertions(+), 37 deletions(-)
diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h
index 4316e8f6..a0e94391 100644
--- a/cpp/src/common/tsblock/tsblock.h
+++ b/cpp/src/common/tsblock/tsblock.h
@@ -211,8 +211,7 @@ class ColAppender {
}
return E_OK;
}
- FORCE_INLINE int fill(const char *value, uint32_t len,
- uint32_t end_index) {
+ FORCE_INLINE int fill(const char *value, uint32_t len, uint32_t end_index)
{
while (column_row_count_ < end_index) {
if (!add_row()) {
return E_INVALID_ARG;
@@ -258,6 +257,13 @@ class RowIterator {
}
}
+ FORCE_INLINE void next(size_t ind) const {
+ ASSERT(row_id_ < tsblock_->row_count_);
+ tsblock_->vectors_[ind]->update_offset();
+ }
+
+ FORCE_INLINE void update_row_id() { row_id_++; }
+
FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len,
bool *__restrict null) {
ASSERT(column_index < column_count_);
@@ -287,8 +293,10 @@ class ColIterator {
FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; }
FORCE_INLINE void next() {
+ if (!vec_->is_null(row_id_)) {
+ vec_->update_offset();
+ }
++row_id_;
- vec_->update_offset();
}
FORCE_INLINE bool has_null() { return vec_->has_null(); }
diff --git a/cpp/src/common/tsblock/tuple_desc.h
b/cpp/src/common/tsblock/tuple_desc.h
index 98bd341d..85ba1309 100644
--- a/cpp/src/common/tsblock/tuple_desc.h
+++ b/cpp/src/common/tsblock/tuple_desc.h
@@ -71,6 +71,11 @@ class TupleDesc {
return column_list_[index].data_type_;
}
+ FORCE_INLINE common::ColumnCategory get_column_category(
+ const uint32_t index) const {
+ return column_list_[index].column_category_;
+ }
+
FORCE_INLINE std::string get_column_name(uint32_t index) {
return column_list_[index].column_name_;
}
diff --git a/cpp/src/reader/aligned_chunk_reader.cc
b/cpp/src/reader/aligned_chunk_reader.cc
index 230661f3..5e1bbe43 100644
--- a/cpp/src/reader/aligned_chunk_reader.cc
+++ b/cpp/src/reader/aligned_chunk_reader.cc
@@ -302,7 +302,8 @@ int AlignedChunkReader::read_from_file_and_rewrap(
int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset;
int read_size =
(want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size);
- if (file_data_buf_size < read_size || (may_shrink && read_size <
file_data_buf_size / 10)) {
+ if (file_data_buf_size < read_size ||
+ (may_shrink && read_size < file_data_buf_size / 10)) {
file_data_buf = (char *)mem_realloc(file_data_buf, read_size);
if (IS_NULL(file_data_buf)) {
return E_OOM;
@@ -366,7 +367,6 @@ int AlignedChunkReader::decode_cur_time_page_data() {
uint32_t time_compressed_buf_size = 0;
uint32_t time_uncompressed_buf_size = 0;
-
// Step 2: do uncompress
if (IS_SUCC(ret)) {
time_compressed_buf =
@@ -519,9 +519,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
uint32_t mask = 1 << 7;
\
int64_t time = 0;
\
CppType value;
\
- while ((time_decoder_->has_remaining() || time_in.has_remaining())
\
- && (value_decoder_->has_remaining() ||
\
- value_in.has_remaining())){
\
+ while (
\
+ (time_decoder_->has_remaining() || time_in.has_remaining()) &&
\
+ (value_decoder_->has_remaining() || value_in.has_remaining())) {
\
cur_value_index++;
\
if (((value_page_col_notnull_bitmap_[cur_value_index / 8] &
\
0xFF) &
\
@@ -530,16 +530,17 @@ int
AlignedChunkReader::decode_time_value_buf_into_tsblock(
if (ret != E_OK) {
\
break;
\
}
\
- ret = value_decoder_->read_##ReadType(value,
\
- value_in);
\
- if (ret != E_OK) {
\
+ if (UNLIKELY(!row_appender.add_row())) {
\
+ ret = E_OVERFLOW;
\
break;
\
}
\
+ row_appender.append(0, (char *)&time, sizeof(time));
\
+ row_appender.append_null(1);
\
continue;
\
}
\
if (UNLIKELY(!row_appender.add_row())) {
\
ret = E_OVERFLOW;
\
- cur_value_index--; \
+ cur_value_index--;
\
break;
\
} else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) {
\
} else if (RET_FAIL(value_decoder_->read_##ReadType(value,
\
@@ -549,7 +550,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock(
continue;
\
} else {
\
/*std::cout << "decoder: time=" << time << ", value=" << value
\
- * << std::endl;*/ \
+ * << std::endl;*/
\
row_appender.append(0, (char *)&time, sizeof(time));
\
row_appender.append(1, (char *)&value, sizeof(value));
\
}
\
diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc
b/cpp/src/reader/block/single_device_tsblock_reader.cc
index 507597c0..1df563cd 100644
--- a/cpp/src/reader/block/single_device_tsblock_reader.cc
+++ b/cpp/src/reader/block/single_device_tsblock_reader.cc
@@ -29,8 +29,7 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader(
field_filter_(field_filter),
block_size_(block_size),
tuple_desc_(),
- tsfile_io_reader_(tsfile_io_reader) {
-}
+ tsfile_io_reader_(tsfile_io_reader) {}
int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task,
uint32_t block_size, Filter* time_filter,
@@ -63,9 +62,9 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask*
device_query_task,
->get_measurement_columns()
.size());
if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes(
- device_query_task->get_device_id(),
- device_query_task->get_column_mapping()->get_measurement_columns(),
- time_series_indexs, pa_))) {
+ device_query_task->get_device_id(),
+ device_query_task->get_column_mapping()->get_measurement_columns(),
+ time_series_indexs, pa_))) {
return ret;
}
for (const auto& time_series_index : time_series_indexs) {
@@ -171,6 +170,21 @@ int SingleDeviceTsBlockReader::fill_measurements(
break;
}
}
+
+ // Align all columns, filling with nulls where data is missing.
+ uint32_t row_count =
+ col_appenders_[time_column_index_]->get_col_row_count();
+ for (auto& col_appender : col_appenders_) {
+ if (tuple_desc_.get_column_category(
+ col_appender->get_column_index()) !=
+ common::ColumnCategory::FIELD) {
+ continue;
+ }
+ while (col_appender->get_col_row_count() < row_count) {
+ col_appender->add_row();
+ col_appender->append_null();
+ }
+ }
}
return ret;
}
@@ -366,8 +380,8 @@ int
SingleMeasurementColumnContext::get_current_value(char*& value,
if (value_iter_->end()) {
return common::E_NO_MORE_DATA;
}
- value = value_iter_->read(&len);
- assert(value != nullptr);
+ bool is_null = false;
+ value = value_iter_->read(&len, &is_null);
return common::E_OK;
}
@@ -392,7 +406,11 @@ void SingleMeasurementColumnContext::fill_into(
}
for (int32_t pos : pos_in_result_) {
col_appenders[pos + 1]->add_row();
- col_appenders[pos + 1]->append(val, len);
+ if (val == nullptr) {
+ col_appenders[pos + 1]->append_null();
+ } else {
+ col_appenders[pos + 1]->append(val, len);
+ }
}
}
diff --git a/cpp/src/reader/table_result_set.cc
b/cpp/src/reader/table_result_set.cc
index 396913e9..01c3b239 100644
--- a/cpp/src/reader/table_result_set.cc
+++ b/cpp/src/reader/table_result_set.cc
@@ -74,9 +74,10 @@ int TableResultSet::next(bool& has_next) {
if (!null) {
row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i),
value, len, pa_);
+ row_iterator_->next(i);
}
}
- row_iterator_->next();
+ row_iterator_->update_row_id();
}
return ret;
}
diff --git a/cpp/src/writer/time_chunk_writer.cc
b/cpp/src/writer/time_chunk_writer.cc
index 892c0d1c..81fafc5a 100644
--- a/cpp/src/writer/time_chunk_writer.cc
+++ b/cpp/src/writer/time_chunk_writer.cc
@@ -191,4 +191,9 @@ int64_t TimeChunkWriter::estimate_max_series_mem_size() {
time_page_writer_.get_statistic()->get_type());
}
+bool TimeChunkWriter::hasData() {
+ return num_of_pages_ > 0 || (time_page_writer_.get_statistic() != nullptr
&&
+ time_page_writer_.get_statistic()->count_ >
0);
+}
+
} // end namespace storage
diff --git a/cpp/src/writer/time_chunk_writer.h
b/cpp/src/writer/time_chunk_writer.h
index e03b264c..aff8e2af 100644
--- a/cpp/src/writer/time_chunk_writer.h
+++ b/cpp/src/writer/time_chunk_writer.h
@@ -28,6 +28,8 @@
namespace storage {
+// TODO: TimeChunkWriter, ValueChunkWriter, ChunkWriter can be further
+// abstracted.
class TimeChunkWriter {
public:
static const int32_t PAGES_DATA_PAGE_SIZE = 1024;
@@ -68,6 +70,8 @@ class TimeChunkWriter {
int64_t estimate_max_series_mem_size();
+ bool hasData();
+
private:
FORCE_INLINE bool is_cur_page_full() const {
// FIXME
diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc
index e9a1162a..73e4543e 100644
--- a/cpp/src/writer/tsfile_writer.cc
+++ b/cpp/src/writer/tsfile_writer.cc
@@ -313,7 +313,8 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet
&tablet) {
if (col_index == -1) {
return E_COLUMN_NOT_EXIST;
}
- if (table_schema->get_data_types()[col_index] !=
tablet.schema_vec_->at(i).data_type_) {
+ if (table_schema->get_data_types()[col_index] !=
+ tablet.schema_vec_->at(i).data_type_) {
return E_TYPE_NOT_MATCH;
}
const common::ColumnCategory column_category =
@@ -1055,6 +1056,11 @@ int TsFileWriter::flush() {
bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
bool is_aligned) {
+ if (chunk_group->is_aligned_ &&
+ chunk_group->time_chunk_writer_ != nullptr &&
+ chunk_group->time_chunk_writer_->hasData()) {
+ return false;
+ }
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
index b1ef896a..d2f6c1c2 100644
--- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc
+++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc
@@ -32,7 +32,7 @@ using namespace storage;
using namespace common;
class TsFileWriterTableTest : public ::testing::Test {
-protected:
+ protected:
void SetUp() override {
libtsfile_init();
file_name_ = std::string("tsfile_writer_table_test_") +
@@ -49,7 +49,7 @@ protected:
std::string file_name_;
WriteFile write_file_;
-public:
+ public:
static std::string generate_random_string(int length) {
std::random_device rd;
std::mt19937 gen(rd());
@@ -101,7 +101,8 @@ public:
for (int i = 0; i < device_num; i++) {
PageArena pa;
pa.init(512, MOD_DEFAULT);
- std::string device_str = std::string("device_id_") +
std::to_string(i);
+ std::string device_str =
+ std::string("device_id_") + std::to_string(i);
String literal_str(device_str, pa);
for (int l = 0; l < num_timestamp_per_device; l++) {
int row_index = i * num_timestamp_per_device + l;
@@ -450,7 +451,7 @@ TEST_F(TsFileWriterTableTest, WriteAndReadSimple) {
cur_line++;
int64_t timestamp = table_result_set->get_value<int64_t>("time");
ASSERT_EQ(table_result_set->get_value<common::String*>("device")
- ->to_std_string(),
+ ->to_std_string(),
"device" + std::to_string(timestamp));
ASSERT_EQ(table_result_set->get_value<double>("VaLue"),
timestamp * 1.1);
@@ -497,7 +498,7 @@ TEST_F(TsFileWriterTableTest, DuplicateColumnName) {
ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->write_table(tablet));
ASSERT_EQ(E_INVALID_ARG, tsfile_table_writer->register_table(
- std::make_shared<TableSchema>(*table_schema)));
+
std::make_shared<TableSchema>(*table_schema)));
delete table_schema;
}
@@ -637,8 +638,8 @@ TEST_F(TsFileWriterTableTest, WriteWithNullAndEmptyTag) {
TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
common::config_set_max_degree_of_index_node(5);
auto table_schema = gen_table_schema(0, 1, 100);
- auto tsfile_table_writer_ = std::make_shared<TsFileTableWriter>(
- &write_file_, table_schema);
+ auto tsfile_table_writer_ =
+ std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
int num_row_per_device = 10;
auto tablet = gen_tablet(table_schema, 0, 100, num_row_per_device);
ASSERT_EQ(tsfile_table_writer_->write_table(tablet), common::E_OK);
@@ -651,23 +652,23 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
ResultSet* tmp_result_set = nullptr;
ret = reader.query(table_schema->get_table_name(),
- table_schema->get_measurement_names(), 0,
- INT32_MAX, tmp_result_set);
+ table_schema->get_measurement_names(), 0, INT32_MAX,
+ tmp_result_set);
auto* table_result_set = (TableResultSet*)tmp_result_set;
bool has_next = false;
int64_t row_num = 0;
auto result_set_meta = table_result_set->get_metadata();
ASSERT_EQ(result_set_meta->get_column_count(),
- table_schema->get_columns_num() + 1); // +1: time column
+ table_schema->get_columns_num() + 1); // +1: time column
while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
auto column_schemas = table_schema->get_measurement_schemas();
- std::string tag_col_val; // "device_id_[num]"
+ std::string tag_col_val; // "device_id_[num]"
std::string tag_col_val_prefix = "device_id_";
for (const auto& column_schema : column_schemas) {
switch (column_schema->data_type_) {
case TSDataType::INT64:
if (!table_result_set->is_null(
- column_schema->measurement_name_)) {
+ column_schema->measurement_name_)) {
std::string num = tag_col_val.substr(
tag_col_val_prefix.length(),
tag_col_val.length() -
tag_col_val_prefix.length());
@@ -677,8 +678,10 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
}
break;
case TSDataType::STRING:
- tag_col_val = table_result_set->get_value<common::String*>(
- column_schema->measurement_name_)->to_std_string();
+ tag_col_val = table_result_set
+ ->get_value<common::String*>(
+ column_schema->measurement_name_)
+ ->to_std_string();
default:
break;
}
@@ -690,3 +693,81 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) {
ASSERT_EQ(reader.close(), common::E_OK);
delete table_schema;
}
+
+TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) {
+ std::vector<MeasurementSchema*> measurement_schemas;
+ std::vector<ColumnCategory> column_categories;
+ for (int i = 0; i < 3; i++) {
+ measurement_schemas.emplace_back(new MeasurementSchema(
+ "id" + std::to_string(i), TSDataType::STRING));
+ column_categories.emplace_back(ColumnCategory::TAG);
+ }
+ measurement_schemas.emplace_back(new MeasurementSchema("value", DOUBLE));
+ measurement_schemas.emplace_back(new MeasurementSchema("value1", INT32));
+ column_categories.emplace_back(ColumnCategory::FIELD);
+ column_categories.emplace_back(ColumnCategory::FIELD);
+ auto table_schema =
+ new TableSchema("testTable", measurement_schemas, column_categories);
+ auto tsfile_table_writer =
+ std::make_shared<TsFileTableWriter>(&write_file_, table_schema);
+ int time = 0;
+ Tablet tablet = Tablet(table_schema->get_measurement_names(),
+ table_schema->get_data_types(), 100);
+
+ for (int i = 0; i < 100; i++) {
+ tablet.add_timestamp(i, static_cast<int64_t>(time++));
+ tablet.add_value(i, 0, "tag1");
+ tablet.add_value(i, 1, "tag2");
+ if (i % 3 == 0) {
+ // all device has no data
+ tablet.add_value(i, 2, "tag_null");
+ } else {
+ tablet.add_value(i, 2, "tag3");
+ tablet.add_value(i, 3, 100.0f);
+ if (i % 5 == 0) {
+ tablet.add_value(i, 4, 100);
+ }
+ }
+ }
+ tsfile_table_writer->write_table(tablet);
+ tsfile_table_writer->flush();
+ tsfile_table_writer->close();
+
+ delete table_schema;
+
+ auto reader = TsFileReader();
+ reader.open(write_file_.get_file_path());
+ ResultSet* ret = nullptr;
+ int ret_value = reader.query(
+ "testTable", {"id0", "id1", "id2", "value", "value1"}, 0, 100, ret);
+ ASSERT_EQ(common::E_OK, ret_value);
+
+ auto table_result_set = (TableResultSet*)ret;
+ bool has_next = false;
+ int cur_line = 0;
+ auto schema = table_result_set->get_metadata();
+ while (IS_SUCC(table_result_set->next(has_next)) && has_next) {
+ int64_t timestamp = table_result_set->get_value<int64_t>(1);
+ ASSERT_EQ(common::String("tag1"),
+ *table_result_set->get_value<common::String*>(2));
+ ASSERT_EQ(common::String("tag2"),
+ *table_result_set->get_value<common::String*>(3));
+ if (timestamp % 3 == 0) {
+ ASSERT_EQ(common::String("tag_null"),
+ *table_result_set->get_value<common::String*>(4));
+ ASSERT_TRUE(table_result_set->is_null(5));
+ ASSERT_TRUE(table_result_set->is_null(6));
+ } else {
+ ASSERT_EQ(common::String("tag3"),
+ *table_result_set->get_value<common::String*>(4));
+ ASSERT_EQ(100.0f, table_result_set->get_value<double>(5));
+ if (timestamp % 5 == 0) {
+ ASSERT_EQ(100, table_result_set->get_value<int32_t>(6));
+ } else {
+ ASSERT_TRUE(table_result_set->is_null(6));
+ }
+ }
+ }
+ reader.destroy_query_data_set(table_result_set);
+ ASSERT_EQ(reader.close(), common::E_OK);
+}