This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_python_V4 in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 813445d31715f7b94d1407771a3c1a5f379857e1 Author: colin <[email protected]> AuthorDate: Fri Feb 28 22:10:24 2025 +0800 fix tree index. --- cpp/src/reader/qds_with_timegenerator.cc | 15 ++++++--- cpp/src/reader/qds_without_timegenerator.cc | 11 +++--- cpp/src/reader/table_result_set.cc | 28 ++++++++-------- cpp/test/writer/tsfile_writer_test.cc | 52 ++++++++++++++++++++++------- 4 files changed, 70 insertions(+), 36 deletions(-) diff --git a/cpp/src/reader/qds_with_timegenerator.cc b/cpp/src/reader/qds_with_timegenerator.cc index 3036e32f..fd0e1292 100644 --- a/cpp/src/reader/qds_with_timegenerator.cc +++ b/cpp/src/reader/qds_with_timegenerator.cc @@ -111,7 +111,8 @@ void *ValueAt::at(int64_t target_timestamp) { cur_time_ = INT64_MAX; return nullptr; } - data_type_ = tsblock_->get_tuple_desc()->get_column_schema(1).data_type_; + data_type_ = + tsblock_->get_tuple_desc()->get_column_schema(1).data_type_; time_col_iter_ = new ColIterator(0, tsblock_); value_col_iter_ = new ColIterator(1, tsblock_); } @@ -296,9 +297,10 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) { for (const auto &path : paths) { column_names.push_back(path.full_path_); } + index_lookup_.insert({"time", 0}); for (size_t i = 0; i < paths.size(); i++) { ValueAt va; - index_lookup_.insert({paths[i].measurement_, i}); + index_lookup_.insert({paths[i].measurement_, i + 1}); if (RET_FAIL(io_reader_->alloc_ssi( paths[i].device_id_, paths[i].measurement_, va.ssi_, pa_))) { } else { @@ -307,8 +309,9 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) { value_at_vec_.push_back(va); } } - result_set_metadata_ = std::make_shared<ResultSetMetadata>(column_names, data_types); - row_record_ = new RowRecord(value_at_vec_.size()); + result_set_metadata_ = + std::make_shared<ResultSetMetadata>(column_names, data_types); + row_record_ = new RowRecord(value_at_vec_.size() + 1); tree_ = construct_node_tree(qe->expression_); return E_OK; } @@ -354,6 +357,7 @@ int QDSWithTimeGenerator::next(bool &has_next) { return E_OK; } row_record_->set_timestamp(timestamp); + row_record_->get_field(0)->set_value(TSDataType::INT64, ×tamp, pa_); #if DEBUG_SE std::cout << "QDSWithTimeGenerator::get_next: timestamp=" << timestamp << ", will generate row at this timestamp." << std::endl; @@ -362,7 +366,8 @@ int QDSWithTimeGenerator::next(bool &has_next) { for (size_t i = 0; i < value_at_vec_.size(); i++) { ValueAt &va = value_at_vec_[i]; void *val_obj_ptr = va.at(timestamp); - row_record_->get_field(i)->set_value(va.data_type_, val_obj_ptr, pa_); + row_record_->get_field(i + 1)->set_value(va.data_type_, val_obj_ptr, + pa_); } tree_->next_timestamp(timestamp); diff --git a/cpp/src/reader/qds_without_timegenerator.cc b/cpp/src/reader/qds_without_timegenerator.cc index db633370..0a4a6f38 100644 --- a/cpp/src/reader/qds_without_timegenerator.cc +++ b/cpp/src/reader/qds_without_timegenerator.cc @@ -45,6 +45,7 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, if (global_time_expression != nullptr) { global_time_filter = global_time_expression->filter_; } + index_lookup_.insert({"time", 0}); for (size_t i = 0; i < origin_path_count; i++) { TsFileSeriesScanIterator *ssi = nullptr; ret = io_reader_->alloc_ssi(paths[i].device_id_, paths[i].measurement_, @@ -52,7 +53,7 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, if (ret != 0) { return ret; } else { - index_lookup_.insert({paths[i].measurement_, i}); + index_lookup_.insert({paths[i].measurement_, i + 1}); ssi_vec_.push_back(ssi); valid_paths.push_back(paths[i]); column_names.push_back(paths[i].full_path_); @@ -60,7 +61,7 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, } size_t path_count = valid_paths.size(); - row_record_ = new RowRecord(path_count); + row_record_ = new RowRecord(path_count + 1); tsblocks_.resize(path_count); time_iters_.resize(path_count); value_iters_.resize(path_count); @@ -69,7 +70,8 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, get_next_tsblock(i, true); data_types.push_back(value_iters_[i]->get_data_type()); } - result_set_metadata_ = std::make_shared<ResultSetMetadata>(column_names, data_types); + result_set_metadata_ = + std::make_shared<ResultSetMetadata>(column_names, data_types); return E_OK; // ignore invalid timeseries } @@ -114,12 +116,13 @@ int QDSWithoutTimeGenerator::next(bool &has_next) { } int64_t time = heap_time_.begin()->first; row_record_->set_timestamp(time); + row_record_->get_field(0)->set_value(INT64, &time, pa_); uint32_t count = heap_time_.count(time); std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time); for (uint32_t i = 0; i < count; ++i) { uint32_t len = 0; - row_record_->get_field(iter->second) + row_record_->get_field(iter->second + 1) ->set_value(value_iters_[iter->second]->get_data_type(), value_iters_[iter->second]->read(&len), pa_); value_iters_[iter->second]->next(); diff --git a/cpp/src/reader/table_result_set.cc b/cpp/src/reader/table_result_set.cc index f520a37d..a6c8f963 100644 --- a/cpp/src/reader/table_result_set.cc +++ b/cpp/src/reader/table_result_set.cc @@ -23,20 +23,20 @@ void TableResultSet::init() { row_record_ = new RowRecord(column_names_.size() + 1); pa_.reset(); pa_.init(512, common::MOD_TSFILE_READER); - index_lookup_.reserve(column_names_.size()); + index_lookup_.reserve(column_names_.size() + 1); + index_lookup_.insert({"time", 0}); for (uint32_t i = 0; i < column_names_.size(); ++i) { index_lookup_.insert({column_names_[i], i + 1}); } - result_set_metadata_ = std::make_shared<ResultSetMetadata>(column_names_, data_types_); + result_set_metadata_ = + std::make_shared<ResultSetMetadata>(column_names_, data_types_); } -TableResultSet::~TableResultSet() { - close(); -} +TableResultSet::~TableResultSet() { close(); } -int TableResultSet::next(bool &has_next) { +int TableResultSet::next(bool& has_next) { int ret = common::E_OK; - while(row_iterator_ == nullptr || !row_iterator_->has_next()) { + while (row_iterator_ == nullptr || !row_iterator_->has_next()) { if (RET_FAIL(tsblock_reader_->has_next(has_next))) { return ret; } else if (!has_next) { @@ -60,7 +60,9 @@ int TableResultSet::next(bool &has_next) { bool null = false; row_record_->reset(); for (uint32_t i = 0; i < row_iterator_->get_column_count(); ++i) { - row_record_->get_field(i)->set_value(row_iterator_->get_data_type(i), row_iterator_->read(i, &len, &null), pa_); + row_record_->get_field(i)->set_value( + row_iterator_->get_data_type(i), + row_iterator_->read(i, &len, &null), pa_); } row_iterator_->next(); } @@ -81,9 +83,7 @@ bool TableResultSet::is_null(uint32_t column_index) { return row_record_->get_field(column_index - 1) == nullptr; } -RowRecord* TableResultSet::get_row_record() { - return row_record_; -} +RowRecord* TableResultSet::get_row_record() { return row_record_; } std::shared_ptr<ResultSetMetadata> TableResultSet::get_metadata() { return result_set_metadata_; @@ -103,9 +103,7 @@ void TableResultSet::close() { if (tsblock_) { delete tsblock_; tsblock_ = nullptr; - } + } } - - -} \ No newline at end of file +} // namespace storage \ No newline at end of file diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index bfeb8917..ce082c2b 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -191,11 +191,11 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) { break; } cur_record_num++; - ASSERT_EQ(qds->get_value<float>(1), (float)1.0); - ASSERT_EQ(qds->get_value<int64_t>(2), (int64_t)415412); - ASSERT_EQ(qds->get_value<bool>(3), true); - ASSERT_EQ(qds->get_value<double>(4), (double)2.0); - ASSERT_EQ(qds->get_value<common::String *>(5)->compare(literal_str), 0); + ASSERT_EQ(qds->get_value<float>(2), (float)1.0); + ASSERT_EQ(qds->get_value<int64_t>(3), (int64_t)415412); + ASSERT_EQ(qds->get_value<bool>(4), true); + ASSERT_EQ(qds->get_value<double>(5), (double)2.0); + ASSERT_EQ(qds->get_value<common::String *>(6)->compare(literal_str), 0); ASSERT_EQ(qds->get_value<float>(measurement_names[0]), (float)1.0); ASSERT_EQ(qds->get_value<int64_t>(measurement_names[1]), @@ -250,8 +250,9 @@ TEST_F(TsFileWriterTest, WriteMultipleRecords) { TEST_F(TsFileWriterTest, WriteDiffrentTypeCombination) { std::string device_path = "device1"; std::string measurement_name = "temperature"; - std::vector<TSDataType> data_types = { - TSDataType::INT32, TSDataType::INT64, TSDataType::FLOAT, TSDataType::DOUBLE}; + std::vector<TSDataType> data_types = {TSDataType::INT32, TSDataType::INT64, + TSDataType::FLOAT, + TSDataType::DOUBLE}; std::vector<TSEncoding> encodings = {TSEncoding::PLAIN, TSEncoding::TS_2DIFF}; std::vector<CompressionType> compression_types = { @@ -259,15 +260,17 @@ TEST_F(TsFileWriterTest, WriteDiffrentTypeCombination) { CompressionType::GZIP, CompressionType::LZ4}; std::vector<MeasurementSchema> schema_vecs; - schema_vecs.reserve(data_types.size() * encodings.size() * compression_types.size()); + schema_vecs.reserve(data_types.size() * encodings.size() * + compression_types.size()); int idx = 0; for (auto data_type : data_types) { for (auto encoding_type : encodings) { for (auto compression_type : compression_types) { - schema_vecs.emplace_back(MeasurementSchema(measurement_name + std::to_string(idx), - data_type, encoding_type, compression_type)); - tsfile_writer_->register_timeseries( - device_path, schema_vecs[idx++]); + schema_vecs.emplace_back(MeasurementSchema( + measurement_name + std::to_string(idx), data_type, + encoding_type, compression_type)); + tsfile_writer_->register_timeseries(device_path, + schema_vecs[idx++]); } } } @@ -366,6 +369,11 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsMultiFlush) { record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { + if (i == 0) { + EXPECT_EQ(std::to_string(record->get_timestamp()), + field_to_string(record->get_field(i))); + continue; + } EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } @@ -443,6 +451,11 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsAlignedMultiFlush) { record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { + if (i == 0) { + ASSERT_EQ(field_to_string(record->get_field(0)), + std::to_string(record->get_timestamp())); + continue; + } EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } @@ -741,6 +754,11 @@ TEST_F(TsFileWriterTest, WriteAlignedTimeseries) { record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { + if (i == 0) { + EXPECT_EQ(std::to_string(record->get_timestamp()), + field_to_string(record->get_field(i))); + continue; + } EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } @@ -806,6 +824,11 @@ TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) { record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { + if (i == 0) { + EXPECT_EQ(std::to_string(record->get_timestamp()), + field_to_string(record->get_field(i))); + continue; + } EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } @@ -871,6 +894,11 @@ TEST_F(TsFileWriterTest, WriteAlignedPartialData) { record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { + if (i == 0) { + EXPECT_EQ(std::to_string(record->get_timestamp()), + field_to_string(record->get_field(i))); + continue; + } EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); }
