This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch fix_only_timestamp in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit ea6a1a4ccab86c856552b9cb1ce8ffb69c1d2c6d Author: ColinLee <[email protected]> AuthorDate: Fri Jun 27 11:37:00 2025 +0800 Fixes the issue where data with only timestamps could not be inserted or queried. --- cpp/src/common/tsblock/tsblock.h | 11 +++- cpp/src/common/tsblock/tuple_desc.h | 4 ++ cpp/src/reader/aligned_chunk_reader.cc | 61 +++++++++++++---- .../reader/block/single_device_tsblock_reader.cc | 35 +++++++--- cpp/src/reader/table_result_set.cc | 3 +- cpp/src/writer/time_chunk_writer.cc | 6 ++ cpp/src/writer/time_chunk_writer.h | 2 + cpp/src/writer/tsfile_writer.cc | 8 ++- .../writer/table_view/tsfile_writer_table_test.cc | 76 ++++++++++++++++++++++ 9 files changed, 184 insertions(+), 22 deletions(-) diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index 4316e8f6..3e1cbe61 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h @@ -258,6 +258,13 @@ class RowIterator { } } + FORCE_INLINE void next(size_t ind) { + ASSERT(row_id_ < tsblock_->row_count_); + tsblock_->vectors_[ind]->update_offset(); + } + + FORCE_INLINE void update_row_id() { row_id_++; } + FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len, bool *__restrict null) { ASSERT(column_index < column_count_); @@ -287,8 +294,10 @@ class ColIterator { FORCE_INLINE bool end() const { return row_id_ >= tsblock_->row_count_; } FORCE_INLINE void next() { + if (!vec_->is_null(row_id_)) { + vec_->update_offset(); + } ++row_id_; - vec_->update_offset(); } FORCE_INLINE bool has_null() { return vec_->has_null(); } diff --git a/cpp/src/common/tsblock/tuple_desc.h b/cpp/src/common/tsblock/tuple_desc.h index 98bd341d..fea0252e 100644 --- a/cpp/src/common/tsblock/tuple_desc.h +++ b/cpp/src/common/tsblock/tuple_desc.h @@ -71,6 +71,10 @@ class TupleDesc { return column_list_[index].data_type_; } + FORCE_INLINE common::ColumnCategory get_column_category(uint32_t index) { + return column_list_[index].column_category_; + } + FORCE_INLINE std::string get_column_name(uint32_t index) { return column_list_[index].column_name_; } diff --git a/cpp/src/reader/aligned_chunk_reader.cc b/cpp/src/reader/aligned_chunk_reader.cc index 230661f3..d629adac 100644 --- a/cpp/src/reader/aligned_chunk_reader.cc +++ b/cpp/src/reader/aligned_chunk_reader.cc @@ -302,7 +302,8 @@ int AlignedChunkReader::read_from_file_and_rewrap( int offset = chunk_meta->offset_of_chunk_header_ + chunk_visit_offset; int read_size = (want_size < DEFAULT_READ_SIZE ? DEFAULT_READ_SIZE : want_size); - if (file_data_buf_size < read_size || (may_shrink && read_size < file_data_buf_size / 10)) { + if (file_data_buf_size < read_size || + (may_shrink && read_size < file_data_buf_size / 10)) { file_data_buf = (char *)mem_realloc(file_data_buf, read_size); if (IS_NULL(file_data_buf)) { return E_OOM; @@ -366,7 +367,6 @@ int AlignedChunkReader::decode_cur_time_page_data() { uint32_t time_compressed_buf_size = 0; uint32_t time_uncompressed_buf_size = 0; - // Step 2: do uncompress if (IS_SUCC(ret)) { time_compressed_buf = @@ -519,9 +519,9 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( uint32_t mask = 1 << 7; \ int64_t time = 0; \ CppType value; \ - while ((time_decoder_->has_remaining() || time_in.has_remaining()) \ - && (value_decoder_->has_remaining() || \ - value_in.has_remaining())){ \ + while ( \ + (time_decoder_->has_remaining() || time_in.has_remaining()) && \ + (value_decoder_->has_remaining() || value_in.has_remaining())) { \ cur_value_index++; \ if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & \ 0xFF) & \ @@ -530,8 +530,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( if (ret != E_OK) { \ break; \ } \ - ret = value_decoder_->read_##ReadType(value, \ - value_in); \ + ret = value_decoder_->read_##ReadType(value, value_in); \ if (ret != E_OK) { \ break; \ } \ @@ -539,7 +538,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( } \ if (UNLIKELY(!row_appender.add_row())) { \ ret = E_OVERFLOW; \ - cur_value_index--; \ + cur_value_index--; \ break; \ } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { \ } else if (RET_FAIL(value_decoder_->read_##ReadType(value, \ @@ -549,7 +548,7 @@ int AlignedChunkReader::decode_time_value_buf_into_tsblock( continue; \ } else { \ /*std::cout << "decoder: time=" << time << ", value=" << value \ - * << std::endl;*/ \ + * << std::endl;*/ \ row_appender.append(0, (char *)&time, sizeof(time)); \ row_appender.append(1, (char *)&value, sizeof(value)); \ } \ @@ -596,6 +595,8 @@ int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype( ByteStream &time_in, ByteStream &value_in, TsBlock *ret_tsblock, Filter *filter) { int ret = E_OK; + uint32_t mask = 1 << 7; + int64_t time = 0; RowAppender row_appender(ret_tsblock); switch (value_chunk_header_.data_type_) { case common::BOOLEAN: @@ -615,8 +616,46 @@ int AlignedChunkReader::decode_tv_buf_into_tsblock_by_datatype( row_appender); break; case common::DOUBLE: - DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, value_in_, - row_appender); + // DECODE_TYPED_TV_INTO_TSBLOCK(double, double, time_in_, value_in_, + // row_appender); + double value; + while ( + (time_decoder_->has_remaining() || time_in.has_remaining()) && + (value_decoder_->has_remaining() || value_in.has_remaining())) { + cur_value_index++; + if (((value_page_col_notnull_bitmap_[cur_value_index / 8] & + 0xFF) & + (mask >> (cur_value_index % 8))) == 0) { + ret = time_decoder_->read_int64(time, time_in); + if (ret != E_OK) { + break; + } + if (UNLIKELY(!row_appender.add_row())) { + ret = E_OVERFLOW; + break; + } + row_appender.append(0, (char *)&time, sizeof(time)); + row_appender.append_null(1); + continue; + } + if (UNLIKELY(!row_appender.add_row())) { + ret = E_OVERFLOW; + cur_value_index--; + break; + } else if (RET_FAIL(time_decoder_->read_int64(time, time_in))) { + } else if (RET_FAIL( + value_decoder_->read_double(value, value_in))) { + } else if (filter != nullptr && !filter->satisfy(time, value)) { + row_appender.backoff_add_row(); + continue; + } else { + /*std::cout << "decoder: time=" << time << ", value=" << + * value + * << std::endl;*/ + row_appender.append(0, (char *)&time, sizeof(time)); + row_appender.append(1, (char *)&value, sizeof(value)); + } + } break; default: ret = E_NOT_SUPPORT; diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 507597c0..9b5f9bac 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -19,6 +19,8 @@ #include "single_device_tsblock_reader.h" +#include <cwrapper/tsfile_cwrapper.h> + namespace storage { SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( @@ -29,8 +31,7 @@ SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( field_filter_(field_filter), block_size_(block_size), tuple_desc_(), - tsfile_io_reader_(tsfile_io_reader) { -} + tsfile_io_reader_(tsfile_io_reader) {} int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, uint32_t block_size, Filter* time_filter, @@ -63,9 +64,9 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, ->get_measurement_columns() .size()); if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes( - device_query_task->get_device_id(), - device_query_task->get_column_mapping()->get_measurement_columns(), - time_series_indexs, pa_))) { + device_query_task->get_device_id(), + device_query_task->get_column_mapping()->get_measurement_columns(), + time_series_indexs, pa_))) { return ret; } for (const auto& time_series_index : time_series_indexs) { @@ -171,6 +172,20 @@ int SingleDeviceTsBlockReader::fill_measurements( break; } } + + uint32_t row_count = + col_appenders_[time_column_index_]->get_col_row_count(); + for (auto& col_appender : col_appenders_) { + if (tuple_desc_.get_column_category( + col_appender->get_column_index()) != + common::ColumnCategory::FIELD) { + continue; + } + while (col_appender->get_col_row_count() < row_count) { + col_appender->add_row(); + col_appender->append_null(); + } + } } return ret; } @@ -366,8 +381,8 @@ int SingleMeasurementColumnContext::get_current_value(char*& value, if (value_iter_->end()) { return common::E_NO_MORE_DATA; } - value = value_iter_->read(&len); - assert(value != nullptr); + bool is_null = false; + value = value_iter_->read(&len, &is_null); return common::E_OK; } @@ -392,7 +407,11 @@ void SingleMeasurementColumnContext::fill_into( } for (int32_t pos : pos_in_result_) { col_appenders[pos + 1]->add_row(); - col_appenders[pos + 1]->append(val, len); + if (val == nullptr) { + col_appenders[pos + 1]->append_null(); + } else { + col_appenders[pos + 1]->append(val, len); + } } } diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index 396913e9..01c3b239 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -74,9 +74,10 @@ int TableResultSet::next(bool& has_next) { if (!null) { row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i), value, len, pa_); + row_iterator_->next(i); } } - row_iterator_->next(); + row_iterator_->update_row_id(); } return ret; } diff --git a/cpp/src/writer/time_chunk_writer.cc b/cpp/src/writer/time_chunk_writer.cc index 892c0d1c..47a684d6 100644 --- a/cpp/src/writer/time_chunk_writer.cc +++ b/cpp/src/writer/time_chunk_writer.cc @@ -191,4 +191,10 @@ int64_t TimeChunkWriter::estimate_max_series_mem_size() { time_page_writer_.get_statistic()->get_type()); } +bool TimeChunkWriter::hasData() { + return num_of_pages_ > 0 || + (time_page_writer_.get_statistic() != nullptr && + time_page_writer_.get_statistic()->count_ > 0); +} + } // end namespace storage diff --git a/cpp/src/writer/time_chunk_writer.h b/cpp/src/writer/time_chunk_writer.h index e03b264c..02c15a5b 100644 --- a/cpp/src/writer/time_chunk_writer.h +++ b/cpp/src/writer/time_chunk_writer.h @@ -68,6 +68,8 @@ class TimeChunkWriter { int64_t estimate_max_series_mem_size(); + bool hasData(); + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index e9a1162a..73e4543e 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -313,7 +313,8 @@ int TsFileWriter::do_check_and_prepare_tablet(Tablet &tablet) { if (col_index == -1) { return E_COLUMN_NOT_EXIST; } - if (table_schema->get_data_types()[col_index] != tablet.schema_vec_->at(i).data_type_) { + if (table_schema->get_data_types()[col_index] != + tablet.schema_vec_->at(i).data_type_) { return E_TYPE_NOT_MATCH; } const common::ColumnCategory column_category = @@ -1055,6 +1056,11 @@ int TsFileWriter::flush() { bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group, bool is_aligned) { + if (chunk_group->is_aligned_ && + chunk_group->time_chunk_writer_ != nullptr && + chunk_group->time_chunk_writer_->hasData()) { + return false; + } MeasurementSchemaMap &map = chunk_group->measurement_schema_map_; for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end(); ms_iter++) { diff --git a/cpp/test/writer/table_view/tsfile_writer_table_test.cc b/cpp/test/writer/table_view/tsfile_writer_table_test.cc index b1ef896a..67f02741 100644 --- a/cpp/test/writer/table_view/tsfile_writer_table_test.cc +++ b/cpp/test/writer/table_view/tsfile_writer_table_test.cc @@ -690,3 +690,79 @@ TEST_F(TsFileWriterTableTest, MultiDeviceMultiFields) { ASSERT_EQ(reader.close(), common::E_OK); delete table_schema; } + + +TEST_F(TsFileWriterTableTest, WriteDataWithEmptyField) { + std::vector<MeasurementSchema*> measurement_schemas; + std::vector<ColumnCategory> column_categories; + for (int i = 0; i < 3; i++) { + measurement_schemas.emplace_back(new MeasurementSchema( + "id" + std::to_string(i), TSDataType::STRING)); + column_categories.emplace_back(ColumnCategory::TAG); + } + measurement_schemas.emplace_back(new MeasurementSchema("value", DOUBLE)); + measurement_schemas.emplace_back(new MeasurementSchema("value1", DOUBLE)); + column_categories.emplace_back(ColumnCategory::FIELD); + column_categories.emplace_back(ColumnCategory::FIELD); + auto table_schema = + new TableSchema("testTable", measurement_schemas, column_categories); + auto tsfile_table_writer = + std::make_shared<TsFileTableWriter>(&write_file_, table_schema); + int time = 0; + Tablet tablet = Tablet(table_schema->get_measurement_names(), + table_schema->get_data_types(), 100); + + for (int i = 0; i < 100; i++) { + tablet.add_timestamp(i, static_cast<int64_t>(time++)); + tablet.add_value(i, 0, "tag1"); + tablet.add_value(i, 1, "tag2"); + if (i % 3 == 0) { + // all device has no data + tablet.add_value(i, 2, "tag_null"); + } else { + tablet.add_value(i, 2, "tag3"); + tablet.add_value(i, 3, 100.0f); + if (i % 5 == 0) { + tablet.add_value(i, 4, 100.0f); + } + } + } + tsfile_table_writer->write_table(tablet); + tsfile_table_writer->flush(); + tsfile_table_writer->close(); + + delete table_schema; + + auto reader = TsFileReader(); + reader.open(write_file_.get_file_path()); + ResultSet* ret = nullptr; + int ret_value = + reader.query("testTable", {"id0", "id1", "id2", "value", "value1"}, 0, 100, ret); + ASSERT_EQ(common::E_OK, ret_value); + + auto table_result_set = (TableResultSet*)ret; + bool has_next = false; + int cur_line = 0; + auto schema = table_result_set->get_metadata(); + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + int64_t timestamp = table_result_set->get_value<int64_t>(1); + ASSERT_EQ(common::String("tag1") , *table_result_set->get_value<common::String*>(2)); + ASSERT_EQ(common::String("tag2") , *table_result_set->get_value<common::String*>(3)); + if (timestamp % 3 == 0) { + ASSERT_EQ(common::String("tag_null") , *table_result_set->get_value<common::String*>(4)); + ASSERT_TRUE(table_result_set->is_null(5)); + ASSERT_TRUE(table_result_set->is_null(6)); + } else { + ASSERT_EQ(common::String("tag3") , *table_result_set->get_value<common::String*>(4)); + ASSERT_EQ(100.0f, table_result_set->get_value<double>(5)); + if (timestamp % 5 == 0) { + ASSERT_EQ(100.0f , table_result_set->get_value<double>(6)); + } else { + ASSERT_TRUE(table_result_set->is_null(6)); + } + + } + } + reader.destroy_query_data_set(table_result_set); + ASSERT_EQ(reader.close(), common::E_OK); +}
