This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_support_read_tree in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit c735b2fc08089090e428252cd98d95fb69e13af8 Author: ColinLee <[email protected]> AuthorDate: Thu Nov 20 14:52:31 2025 +0800 fix ci. --- cpp/src/common/schema.h | 83 ++++++++++++---------- .../reader/block/single_device_tsblock_reader.cc | 8 +-- cpp/src/reader/table_query_executor.cc | 18 ++--- 3 files changed, 60 insertions(+), 49 deletions(-) diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h index 06e7e7e4..499dd5bc 100644 --- a/cpp/src/common/schema.h +++ b/cpp/src/common/schema.h @@ -46,8 +46,8 @@ struct MeasurementSchema { common::TSDataType data_type_; common::TSEncoding encoding_; common::CompressionType compression_type_; - storage::ChunkWriter *chunk_writer_; - ValueChunkWriter *value_chunk_writer_; + storage::ChunkWriter* chunk_writer_; + ValueChunkWriter* value_chunk_writer_; std::map<std::string, std::string> props_; MeasurementSchema() @@ -58,7 +58,7 @@ struct MeasurementSchema { chunk_writer_(nullptr), value_chunk_writer_(nullptr) {} - MeasurementSchema(const std::string &measurement_name, + MeasurementSchema(const std::string& measurement_name, common::TSDataType data_type) : measurement_name_(measurement_name), data_type_(data_type), @@ -67,7 +67,7 @@ struct MeasurementSchema { chunk_writer_(nullptr), value_chunk_writer_(nullptr) {} - MeasurementSchema(const std::string &measurement_name, + MeasurementSchema(const std::string& measurement_name, common::TSDataType data_type, common::TSEncoding encoding, common::CompressionType compression_type) : measurement_name_(measurement_name), @@ -88,7 +88,7 @@ struct MeasurementSchema { } } - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL( common::SerializationUtil::write_str(measurement_name_, out))) { @@ -102,7 +102,7 @@ struct MeasurementSchema { if (ret == common::E_OK) { if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(), out))) { - for (const auto &prop : props_) { + for (const auto& prop : props_) { if (RET_FAIL(common::SerializationUtil::write_str( prop.first, out))) { } else if (RET_FAIL(common::SerializationUtil::write_str( @@ -115,7 +115,7 @@ struct MeasurementSchema { return ret; } - int deserialize_from(common::ByteStream &in) { + int deserialize_from(common::ByteStream& in) { int ret = common::E_OK; uint8_t data_type = common::TSDataType::INVALID_DATATYPE, encoding = common::TSEncoding::INVALID_ENCODING, @@ -153,8 +153,8 @@ struct MeasurementSchema { } }; -typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap; -typedef std::map<std::string, MeasurementSchema *>::iterator +typedef std::map<std::string, MeasurementSchema*> MeasurementSchemaMap; +typedef std::map<std::string, MeasurementSchema*>::iterator MeasurementSchemaMapIter; typedef std::pair<MeasurementSchemaMapIter, bool> MeasurementSchemaMapInsertResult; @@ -164,7 +164,7 @@ struct MeasurementSchemaGroup { // measurement_name -> MeasurementSchema MeasurementSchemaMap measurement_schema_map_; bool is_aligned_ = false; - TimeChunkWriter *time_chunk_writer_ = nullptr; + TimeChunkWriter* time_chunk_writer_ = nullptr; ~MeasurementSchemaGroup() { if (time_chunk_writer_ != nullptr) { @@ -195,11 +195,11 @@ class TableSchema { * Each ColumnSchema defines the schema for one column * in the table. */ - TableSchema(const std::string &table_name, - const std::vector<common::ColumnSchema> &column_schemas) + TableSchema(const std::string& table_name, + const std::vector<common::ColumnSchema>& column_schemas) : table_name_(table_name) { to_lowercase_inplace(table_name_); - for (const common::ColumnSchema &column_schema : column_schemas) { + for (const common::ColumnSchema& column_schema : column_schemas) { column_schemas_.emplace_back(std::make_shared<MeasurementSchema>( column_schema.get_column_name(), column_schema.get_data_type())); @@ -207,16 +207,16 @@ class TableSchema { column_schema.get_column_category()); } int idx = 0; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { to_lowercase_inplace(measurement_schema->measurement_name_); column_pos_index_.insert( std::make_pair(measurement_schema->measurement_name_, idx++)); } } - TableSchema(const std::string &table_name, - const std::vector<MeasurementSchema *> &column_schemas, - const std::vector<common::ColumnCategory> &column_categories) + TableSchema(const std::string& table_name, + const std::vector<MeasurementSchema*>& column_schemas, + const std::vector<common::ColumnCategory>& column_categories) : table_name_(table_name), column_categories_(column_categories) { to_lowercase_inplace(table_name_); for (const auto column_schema : column_schemas) { @@ -226,34 +226,42 @@ class TableSchema { } } int idx = 0; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { to_lowercase_inplace(measurement_schema->measurement_name_); column_pos_index_.insert( std::make_pair(measurement_schema->measurement_name_, idx++)); } } - TableSchema(TableSchema &&other) noexcept + TableSchema(TableSchema&& other) noexcept : table_name_(std::move(other.table_name_)), column_schemas_(std::move(other.column_schemas_)), column_categories_(std::move(other.column_categories_)) {} - TableSchema(const TableSchema &other) noexcept + TableSchema(const TableSchema& other) noexcept : table_name_(other.table_name_), column_categories_(other.column_categories_) { - for (const auto &column_schema : other.column_schemas_) { + for (const auto& column_schema : other.column_schemas_) { // Just call default construction column_schemas_.emplace_back( std::make_shared<MeasurementSchema>(*column_schema)); } int idx = 0; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { column_pos_index_.insert( std::make_pair(measurement_schema->measurement_name_, idx++)); } } - int serialize_to(common::ByteStream &out) { + // In cases where data is retrieved from a tree to form the table, + // there is no table name in the tree path, so adjustments are needed for + // this scenario. This flag is used specifically for such cases. + // TODO(Colin): remove this. + void set_virtual_table() { is_virtual_table_ = true; } + + bool is_virtual_table() { return is_virtual_table_; } + + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_var_uint( column_schemas_.size(), out))) { @@ -271,7 +279,7 @@ class TableSchema { return ret; } - int deserialize(common::ByteStream &in) { + int deserialize(common::ByteStream& in) { int ret = common::E_OK; uint32_t num_columns; if (RET_FAIL( @@ -294,9 +302,9 @@ class TableSchema { ~TableSchema() { column_schemas_.clear(); } - const std::string &get_table_name() { return table_name_; } + const std::string& get_table_name() { return table_name_; } - void set_table_name(const std::string &table_name) { + void set_table_name(const std::string& table_name) { table_name_ = table_name; } @@ -310,7 +318,7 @@ class TableSchema { int32_t get_columns_num() const { return column_schemas_.size(); } - int find_column_index(const std::string &column_name) { + int find_column_index(const std::string& column_name) { std::string lower_case_column_name = to_lower(column_name); auto it = column_pos_index_.find(lower_case_column_name); if (it != column_pos_index_.end()) { @@ -333,10 +341,10 @@ class TableSchema { size_t get_column_pos_index_num() const { return column_pos_index_.size(); } - void update(ChunkGroupMeta *chunk_group_meta) { + void update(ChunkGroupMeta* chunk_group_meta) { for (auto iter = chunk_group_meta->chunk_meta_list_.begin(); iter != chunk_group_meta->chunk_meta_list_.end(); iter++) { - auto &chunk_meta = iter.get(); + auto& chunk_meta = iter.get(); if (chunk_meta->data_type_ == common::VECTOR) { continue; } @@ -365,7 +373,7 @@ class TableSchema { std::vector<common::TSDataType> get_data_types() const { std::vector<common::TSDataType> ret; - for (const auto &measurement_schema : column_schemas_) { + for (const auto& measurement_schema : column_schemas_) { ret.emplace_back(measurement_schema->data_type_); } return ret; @@ -375,12 +383,12 @@ class TableSchema { return column_categories_; } - std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas() + std::vector<std::shared_ptr<MeasurementSchema>> get_measurement_schemas() const { return column_schemas_; } - common::ColumnSchema get_column_schema(const std::string &column_name) { + common::ColumnSchema get_column_schema(const std::string& column_name) { int column_idx = find_column_index(column_name); if (column_idx == -1) { return common::ColumnSchema(); @@ -394,7 +402,7 @@ class TableSchema { } } - int32_t find_id_column_order(const std::string &column_name) { + int32_t find_id_column_order(const std::string& column_name) { std::string lower_case_column_name = to_lower(column_name); int column_order = 0; @@ -412,17 +420,18 @@ class TableSchema { private: std::string table_name_; - std::vector<std::shared_ptr<MeasurementSchema> > column_schemas_; + std::vector<std::shared_ptr<MeasurementSchema>> column_schemas_; std::vector<common::ColumnCategory> column_categories_; std::map<std::string, int> column_pos_index_; + bool is_virtual_table_ = false; }; struct Schema { - typedef std::unordered_map<std::string, std::shared_ptr<TableSchema> > + typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>> TableSchemasMap; TableSchemasMap table_schema_map_; - void update_table_schema(ChunkGroupMeta *chunk_group_meta) { + void update_table_schema(ChunkGroupMeta* chunk_group_meta) { std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_; auto table_name = device_id->get_table_name(); if (table_schema_map_.find(table_name) == table_schema_map_.end()) { @@ -431,7 +440,7 @@ struct Schema { table_schema_map_[table_name]->update(chunk_group_meta); } void register_table_schema( - const std::shared_ptr<TableSchema> &table_schema) { + const std::shared_ptr<TableSchema>& table_schema) { table_schema_map_[table_schema->get_table_name()] = table_schema; } }; diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 4137006e..8ae22b9b 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -82,8 +82,8 @@ int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, device_query_task->get_column_mapping()->get_id_columns()) { const auto& column_pos_in_result = device_query_task->get_column_mapping()->get_column_pos(id_column); - int column_pos_in_id = - table_schema->find_id_column_order(id_column) + 1; + int column_pos_in_id = table_schema->find_id_column_order(id_column) + + (!table_schema->is_virtual_table()); id_column_contexts_.insert(std::make_pair( id_column, IdColumnContext(column_pos_in_result, column_pos_in_id))); @@ -218,8 +218,8 @@ int SingleDeviceTsBlockReader::fill_ids() { device_query_task_->get_device_id()->get_segments(); int32_t pos_in_device_id = id_column_context.pos_in_device_id_; if (pos_in_device_id >= 0 && - static_cast<size_t>(pos_in_device_id - 1) < segments.size()) { - device_tag = segments[pos_in_device_id - 1]; + static_cast<size_t>(pos_in_device_id) < segments.size()) { + device_tag = segments[pos_in_device_id]; } if (device_tag == nullptr) { diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index 6fbd4f0f..3748b3db 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -140,14 +140,15 @@ int TableQueryExecutor::query_on_tree( if (column_types_map.find(measurement_name) == column_types_map.end()) { common::TSDataType type = ts_index->get_data_type(); - if (type == common::TSDataType::INT32 || - type == common::TSDataType::INT64 || - type == common::TSDataType::TIMESTAMP || - type == common::TSDataType::DATE) { - type = common::TSDataType::INT64; - } else if (type == common::TSDataType::FLOAT) { - type = common::TSDataType::DOUBLE; - } + // TODO(Colin): Fix type missmatch. + // if (type == common::TSDataType::INT32 || + // type == common::TSDataType::INT64 || + // type == common::TSDataType::TIMESTAMP || + // type == common::TSDataType::DATE) { + // type = common::TSDataType::INT64; + // } else if (type == common::TSDataType::FLOAT) { + // type = common::TSDataType::DOUBLE; + // } column_types_map[measurement_name] = type; } } @@ -166,6 +167,7 @@ int TableQueryExecutor::query_on_tree( } auto schema = std::make_shared<TableSchema>("default", col_schema); + schema->set_virtual_table(); std::shared_ptr<ColumnMapping> column_mapping = std::make_shared<ColumnMapping>(); for (size_t i = 0; i < col_schema.size(); ++i) {
