This is an automated email from the ASF dual-hosted git repository. colinlee pushed a commit to branch colin_support_read_tree in repository https://gitbox.apache.org/repos/asf/tsfile.git
commit 7ef45b5a2fd223dc25bad234047fe175dbda6dd3 Author: ColinLee <[email protected]> AuthorDate: Thu Nov 20 10:34:17 2025 +0800 Support query on tree and modify clang-format. --- cpp/.clang-format | 3 +- cpp/src/common/tsblock/tsblock.h | 74 +++--- cpp/src/common/tsfile_common.h | 295 +++++++++++---------- cpp/src/file/tsfile_io_reader.cc | 154 +++++------ .../reader/block/single_device_tsblock_reader.cc | 10 +- cpp/src/reader/device_meta_iterator.cc | 7 +- cpp/src/reader/device_meta_iterator.h | 10 + cpp/src/reader/imeta_data_querier.h | 5 +- cpp/src/reader/meta_data_querier.cc | 6 + cpp/src/reader/meta_data_querier.h | 3 + cpp/src/reader/table_query_executor.cc | 125 ++++++++- cpp/src/reader/table_query_executor.h | 22 +- cpp/src/reader/task/device_task_iterator.h | 15 ++ cpp/src/reader/tsfile_reader.cc | 38 ++- cpp/src/reader/tsfile_reader.h | 40 +-- .../reader/tree_view/tsfile_reader_tree_test.cc | 88 +++++- 16 files changed, 592 insertions(+), 303 deletions(-) diff --git a/cpp/.clang-format b/cpp/.clang-format index 26651a2c..da6eee61 100644 --- a/cpp/.clang-format +++ b/cpp/.clang-format @@ -65,7 +65,6 @@ ConstructorInitializerIndentWidth: 4 ContinuationIndentWidth: 4 Cpp11BracedListStyle: true DeriveLineEnding: true -DerivePointerAlignment: true DisableFormat: false EmptyLineAfterAccessModifier: Never EmptyLineBeforeAccessModifier: LogicalBlock @@ -207,7 +206,7 @@ SpacesInParentheses: false SpacesInSquareBrackets: false SpaceBeforeSquareBrackets: false BitFieldColonSpacing: Both -Standard: Auto +Standard: Cpp11 StatementAttributeLikeMacros: - Q_EMIT StatementMacros: diff --git a/cpp/src/common/tsblock/tsblock.h b/cpp/src/common/tsblock/tsblock.h index a0e94391..dce94f8a 100644 --- a/cpp/src/common/tsblock/tsblock.h +++ b/cpp/src/common/tsblock/tsblock.h @@ -44,7 +44,7 @@ class TsBlock { * information, such as insert scenarios, etc. Then we will use the given * number of rows */ - explicit TsBlock(TupleDesc *tupledesc, uint32_t max_row_count = 0) + explicit TsBlock(TupleDesc* tupledesc, uint32_t max_row_count = 0) : capacity_(g_config_value_.tsblock_max_memory_), row_count_(0), max_row_count_(max_row_count), @@ -60,9 +60,9 @@ class TsBlock { FORCE_INLINE uint32_t get_row_count() const { return row_count_; } - FORCE_INLINE TupleDesc *get_tuple_desc() const { return tuple_desc_; } + FORCE_INLINE TupleDesc* get_tuple_desc() const { return tuple_desc_; } - FORCE_INLINE Vector *get_vector(uint32_t index) { return vectors_[index]; } + FORCE_INLINE Vector* get_vector(uint32_t index) { return vectors_[index]; } FORCE_INLINE uint32_t get_column_count() const { return tuple_desc_->get_column_count(); @@ -104,8 +104,8 @@ class TsBlock { row_count_ = 0; } - FORCE_INLINE static int create_tsblock(TupleDesc *tupledesc, - TsBlock *&ret_tsblock, + FORCE_INLINE static int create_tsblock(TupleDesc* tupledesc, + TsBlock*& ret_tsblock, uint32_t max_row_count = 0) { int ret = common::E_OK; if (ret_tsblock == nullptr) { @@ -119,13 +119,13 @@ class TsBlock { } int init(); - void tsblock_to_json(ByteStream *byte_stream); + void tsblock_to_json(ByteStream* byte_stream); std::string debug_string(); private: int build_vector(common::TSDataType type, uint32_t row_count); - void write_data(ByteStream *__restrict byte_stream, char *__restrict val, + void write_data(ByteStream* __restrict byte_stream, char* __restrict val, uint32_t len, bool has_null, TSDataType type); private: @@ -134,13 +134,13 @@ class TsBlock { uint32_t max_row_count_; common::BitMap select_list_; - TupleDesc *tuple_desc_; - std::vector<Vector *> vectors_; + TupleDesc* tuple_desc_; + std::vector<Vector*> vectors_; }; class RowAppender { public: - explicit RowAppender(TsBlock *tsblock) : tsblock_(tsblock) {} + explicit RowAppender(TsBlock* tsblock) : tsblock_(tsblock) {} ~RowAppender() {} // todo:(yanghao) maybe need to consider select-list @@ -157,25 +157,37 @@ class RowAppender { tsblock_->row_count_--; } - FORCE_INLINE void append(uint32_t slot_index, const char *value, + FORCE_INLINE void append(uint32_t slot_index, const char* value, uint32_t len) { ASSERT(slot_index < tsblock_->tuple_desc_->get_column_count()); - Vector *vec = tsblock_->vectors_[slot_index]; - vec->append(value, len); + Vector* vec = tsblock_->vectors_[slot_index]; + // TODO(Colin): Refine this. + TSDataType datatype = vec->get_vector_type(); + if (len == 4 && datatype == INT64) { + int32_t int32_val = *reinterpret_cast<const int32_t*>(value); + int64_t int64_val = static_cast<int64_t>(int32_val); + vec->append(reinterpret_cast<const char*>(&int64_val), 8); + } else if (len == 4 && datatype == DOUBLE) { + float float_val = *reinterpret_cast<const float*>(value); + double double_val = static_cast<double>(float_val); + vec->append(reinterpret_cast<const char*>(&double_val), 8); + } else { + vec->append(value, len); + } } FORCE_INLINE void append_null(uint32_t slot_index) { - Vector *vec = tsblock_->vectors_[slot_index]; + Vector* vec = tsblock_->vectors_[slot_index]; vec->set_null(tsblock_->row_count_ - 1); } private: - TsBlock *tsblock_; + TsBlock* tsblock_; }; class ColAppender { public: - ColAppender(uint32_t column_index, TsBlock *tsblock) + ColAppender(uint32_t column_index, TsBlock* tsblock) : column_index_(column_index), column_row_count_(0), tsblock_(tsblock) { ASSERT(column_index < tsblock_->tuple_desc_->get_column_count()); vec_ = tsblock_->vectors_[column_index]; @@ -194,7 +206,7 @@ class ColAppender { } } - FORCE_INLINE void append(const char *value, uint32_t len) { + FORCE_INLINE void append(const char* value, uint32_t len) { vec_->append(value, len); } @@ -211,7 +223,7 @@ class ColAppender { } return E_OK; } - FORCE_INLINE int fill(const char *value, uint32_t len, uint32_t end_index) { + FORCE_INLINE int fill(const char* value, uint32_t len, uint32_t end_index) { while (column_row_count_ < end_index) { if (!add_row()) { return E_INVALID_ARG; @@ -225,14 +237,14 @@ class ColAppender { private: uint32_t column_index_; uint32_t column_row_count_; - TsBlock *tsblock_; - Vector *vec_; + TsBlock* tsblock_; + Vector* vec_; }; // todo:(yanghao) need to deal with select-list class RowIterator { public: - explicit RowIterator(TsBlock *tsblock) : tsblock_(tsblock), row_id_(0) { + explicit RowIterator(TsBlock* tsblock) : tsblock_(tsblock), row_id_(0) { column_count_ = tsblock_->tuple_desc_->get_column_count(); } @@ -264,17 +276,17 @@ class RowIterator { FORCE_INLINE void update_row_id() { row_id_++; } - FORCE_INLINE char *read(uint32_t column_index, uint32_t *__restrict len, - bool *__restrict null) { + FORCE_INLINE char* read(uint32_t column_index, uint32_t* __restrict len, + bool* __restrict null) { ASSERT(column_index < column_count_); - Vector *vec = tsblock_->vectors_[column_index]; + Vector* vec = tsblock_->vectors_[column_index]; return vec->read(len, null, row_id_); } std::string debug_string(); // for debug private: - TsBlock *tsblock_; + TsBlock* tsblock_; uint32_t row_id_; // The line number currently being reader uint32_t column_count_; }; @@ -282,7 +294,7 @@ class RowIterator { // todo:(yanghao) need to deal with select-list class ColIterator { public: - ColIterator(uint32_t column_index, const TsBlock *tsblock) + ColIterator(uint32_t column_index, const TsBlock* tsblock) : column_index_(column_index), row_id_(0), tsblock_(tsblock) { ASSERT(column_index < tsblock_->tuple_desc_->get_column_count()); vec_ = tsblock_->vectors_[column_index]; @@ -303,22 +315,22 @@ class ColIterator { FORCE_INLINE TSDataType get_data_type() { return vec_->get_vector_type(); } - FORCE_INLINE char *read(uint32_t *__restrict len, bool *__restrict null) { + FORCE_INLINE char* read(uint32_t* __restrict len, bool* __restrict null) { return vec_->read(len, null, row_id_); } - FORCE_INLINE char *read(uint32_t *len) { return vec_->read(len); } + FORCE_INLINE char* read(uint32_t* len) { return vec_->read(len); } FORCE_INLINE uint32_t get_column_index() { return column_index_; } private: uint32_t column_index_; uint32_t row_id_; - const TsBlock *tsblock_; - Vector *vec_; + const TsBlock* tsblock_; + Vector* vec_; }; -int merge_tsblock_by_row(TsBlock *sea, TsBlock *river); +int merge_tsblock_by_row(TsBlock* sea, TsBlock* river); } // end namespace common #endif // COMMON_TSBLOCK_TSBLOCK_H diff --git a/cpp/src/common/tsfile_common.h b/cpp/src/common/tsfile_common.h index dd22ca40..39cd027e 100644 --- a/cpp/src/common/tsfile_common.h +++ b/cpp/src/common/tsfile_common.h @@ -39,7 +39,7 @@ namespace storage { -extern const char *MAGIC_STRING_TSFILE; +extern const char* MAGIC_STRING_TSFILE; constexpr int MAGIC_STRING_TSFILE_LEN = 6; extern const char VERSION_NUM_BYTE; extern const char CHUNK_GROUP_HEADER_MARKER; @@ -60,7 +60,7 @@ typedef int64_t TsFileID; struct PageHeader { uint32_t uncompressed_size_; uint32_t compressed_size_; - Statistic *statistic_; + Statistic* statistic_; PageHeader() : uncompressed_size_(0), compressed_size_(0), statistic_(nullptr) {} @@ -73,7 +73,7 @@ struct PageHeader { uncompressed_size_ = 0; compressed_size_ = 0; } - int deserialize_from(common::ByteStream &in, bool deserialize_stat, + int deserialize_from(common::ByteStream& in, bool deserialize_stat, common::TSDataType data_type) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_var_uint( @@ -99,7 +99,7 @@ struct PageHeader { } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) { + friend std::ostream& operator<<(std::ostream& os, const PageHeader& h) { os << "{uncompressed_size_=" << h.uncompressed_size_ << ", compressed_size_=" << h.uncompressed_size_; if (h.statistic_ == nullptr) { @@ -132,7 +132,7 @@ struct ChunkHeader { ~ChunkHeader() = default; - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_char(chunk_type_, out))) { } else if (RET_FAIL(common::SerializationUtil::write_var_str( @@ -148,7 +148,7 @@ struct ChunkHeader { } return ret; } - int deserialize_from(common::ByteStream &in) { + int deserialize_from(common::ByteStream& in) { int ret = common::E_OK; in.mark_read_pos(); if (RET_FAIL(common::SerializationUtil::read_char(chunk_type_, in))) { @@ -157,18 +157,18 @@ struct ChunkHeader { } else if (RET_FAIL(common::SerializationUtil::read_var_uint(data_size_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)data_type_, in))) { + (char&)data_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)compression_type_, in))) { + (char&)compression_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)encoding_type_, in))) { + (char&)encoding_type_, in))) { } else { serialized_size_ = in.get_mark_len(); } return ret; } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const ChunkHeader &h) { + friend std::ostream& operator<<(std::ostream& os, const ChunkHeader& h) { os << "{measurement_name=" << h.measurement_name_ << ", data_size=" << h.data_size_ << ", data_type=" << h.data_type_ << ", compression_type=" << h.compression_type_ @@ -197,7 +197,7 @@ struct ChunkMeta { common::String measurement_name_; common::TSDataType data_type_; int64_t offset_of_chunk_header_; - Statistic *statistic_; + Statistic* statistic_; char mask_; common::TSEncoding encoding_; common::CompressionType compression_type_; @@ -209,10 +209,10 @@ struct ChunkMeta { statistic_(nullptr), mask_(0) {} - int init(const common::String &measurement_name, + int init(const common::String& measurement_name, common::TSDataType data_type, int64_t offset_of_chunk_header, - Statistic *stat, char mask, common::TSEncoding encoding, - common::CompressionType compression_type, common::PageArena &pa) { + Statistic* stat, char mask, common::TSEncoding encoding, + common::CompressionType compression_type, common::PageArena& pa) { // TODO check parameter valid measurement_name_.dup_from(measurement_name, pa); data_type_ = data_type; @@ -223,10 +223,10 @@ struct ChunkMeta { compression_type_ = compression_type; return common::E_OK; } - FORCE_INLINE void clone_statistic_from(Statistic *stat) { + FORCE_INLINE void clone_statistic_from(Statistic* stat) { clone_statistic(stat, statistic_, data_type_); } - FORCE_INLINE int clone_from(ChunkMeta &that, common::PageArena *pa) { + FORCE_INLINE int clone_from(ChunkMeta& that, common::PageArena* pa) { int ret = common::E_OK; if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) { return ret; @@ -244,7 +244,7 @@ struct ChunkMeta { mask_ = that.mask_; return ret; } - int serialize_to(common::ByteStream &out, bool serialize_statistic) { + int serialize_to(common::ByteStream& out, bool serialize_statistic) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_i64( offset_of_chunk_header_, out))) { @@ -253,8 +253,8 @@ struct ChunkMeta { } return ret; } - int deserialize_from(common::ByteStream &in, bool deserialize_stat, - common::PageArena *pa) { + int deserialize_from(common::ByteStream& in, bool deserialize_stat, + common::PageArena* pa) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_i64( offset_of_chunk_header_, in))) { @@ -270,7 +270,7 @@ struct ChunkMeta { return ret; } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, const ChunkMeta &cm) { + friend std::ostream& operator<<(std::ostream& os, const ChunkMeta& cm) { os << "{measurement_name=" << cm.measurement_name_ << ", data_type=" << cm.data_type_ << ", offset_of_chunk_header=" << cm.offset_of_chunk_header_ @@ -287,16 +287,16 @@ struct ChunkMeta { struct ChunkGroupMeta { std::shared_ptr<IDeviceID> device_id_; - common::SimpleList<ChunkMeta *> chunk_meta_list_; + common::SimpleList<ChunkMeta*> chunk_meta_list_; - explicit ChunkGroupMeta(common::PageArena *pa_ptr) + explicit ChunkGroupMeta(common::PageArena* pa_ptr) : chunk_meta_list_(pa_ptr) {} FORCE_INLINE int init(std::shared_ptr<IDeviceID> device_id) { device_id_ = device_id; return 0; } - FORCE_INLINE int push(ChunkMeta *cm) { + FORCE_INLINE int push(ChunkMeta* cm) { return chunk_meta_list_.push_back(cm); } }; @@ -305,13 +305,13 @@ class ITimeseriesIndex { public: ITimeseriesIndex() {} ~ITimeseriesIndex() {} - virtual common::SimpleList<ChunkMeta *> *get_chunk_meta_list() const { + virtual common::SimpleList<ChunkMeta*>* get_chunk_meta_list() const { return nullptr; } - virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const { + virtual common::SimpleList<ChunkMeta*>* get_time_chunk_meta_list() const { return nullptr; } - virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const { + virtual common::SimpleList<ChunkMeta*>* get_value_chunk_meta_list() const { return nullptr; } @@ -321,7 +321,7 @@ class ITimeseriesIndex { virtual common::TSDataType get_data_type() const { return common::INVALID_DATATYPE; } - virtual Statistic *get_statistic() const { return nullptr; } + virtual Statistic* get_statistic() const { return nullptr; } }; /* @@ -368,19 +368,18 @@ class TimeseriesIndex : public ITimeseriesIndex { } } - int add_chunk_meta(ChunkMeta *chunk_meta, bool serialize_statistic); - FORCE_INLINE int set_measurement_name(common::String &measurement_name, - common::PageArena &pa) { + int add_chunk_meta(ChunkMeta* chunk_meta, bool serialize_statistic); + FORCE_INLINE int set_measurement_name(common::String& measurement_name, + common::PageArena& pa) { return measurement_name_.dup_from(measurement_name, pa); } - FORCE_INLINE void set_measurement_name(common::String &measurement_name) { + FORCE_INLINE void set_measurement_name(common::String& measurement_name) { measurement_name_.shallow_copy_from(measurement_name); } FORCE_INLINE virtual common::String get_measurement_name() const { return measurement_name_; } - virtual inline common::SimpleList<ChunkMeta *> *get_chunk_meta_list() - const { + virtual inline common::SimpleList<ChunkMeta*>* get_chunk_meta_list() const { return chunk_meta_list_; } FORCE_INLINE void set_ts_meta_type(char ts_meta_type) { @@ -405,7 +404,7 @@ class TimeseriesIndex : public ITimeseriesIndex { statistic_->reset(); return common::E_OK; } - virtual Statistic *get_statistic() const { return statistic_; } + virtual Statistic* get_statistic() const { return statistic_; } common::TsID get_ts_id() const { return ts_id_; } FORCE_INLINE void finish() { @@ -413,7 +412,7 @@ class TimeseriesIndex : public ITimeseriesIndex { chunk_meta_list_serialized_buf_.total_size(); } - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_char( timeseries_meta_type_, out))) { @@ -430,14 +429,14 @@ class TimeseriesIndex : public ITimeseriesIndex { return ret; } - int deserialize_from(common::ByteStream &in, common::PageArena *pa) { + int deserialize_from(common::ByteStream& in, common::PageArena* pa) { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_char(timeseries_meta_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_mystring( measurement_name_, pa, in))) { } else if (RET_FAIL(common::SerializationUtil::read_char( - (char &)data_type_, in))) { + (char&)data_type_, in))) { } else if (RET_FAIL(common::SerializationUtil::read_var_uint( chunk_meta_list_data_size_, in))) { } else if (nullptr == @@ -447,22 +446,22 @@ class TimeseriesIndex : public ITimeseriesIndex { } else if (RET_FAIL(statistic_->deserialize_from(in))) { } else { statistic_from_pa_ = true; - void *chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_)); + void* chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_)); if (IS_NULL(chunk_meta_list_buf)) { return common::E_OOM; } const bool deserialize_chunk_meta_statistic = (timeseries_meta_type_ & 0x3F); // TODO chunk_meta_list_ = - new (chunk_meta_list_buf) common::SimpleList<ChunkMeta *>(pa); + new (chunk_meta_list_buf) common::SimpleList<ChunkMeta*>(pa); uint32_t start_pos = in.read_pos(); while (IS_SUCC(ret) && in.read_pos() < start_pos + chunk_meta_list_data_size_) { - void *cm_buf = pa->alloc(sizeof(ChunkMeta)); + void* cm_buf = pa->alloc(sizeof(ChunkMeta)); if (IS_NULL(cm_buf)) { ret = common::E_OOM; } else { - ChunkMeta *cm = new (cm_buf) ChunkMeta; + ChunkMeta* cm = new (cm_buf) ChunkMeta; cm->measurement_name_.shallow_copy_from( this->measurement_name_); cm->data_type_ = this->data_type_; @@ -477,7 +476,7 @@ class TimeseriesIndex : public ITimeseriesIndex { return ret; } - int clone_from(const TimeseriesIndex &that, common::PageArena *pa) { + int clone_from(const TimeseriesIndex& that, common::PageArena* pa) { int ret = common::E_OK; timeseries_meta_type_ = that.timeseries_meta_type_; chunk_meta_list_data_size_ = that.chunk_meta_list_data_size_; @@ -496,20 +495,20 @@ class TimeseriesIndex : public ITimeseriesIndex { } if (that.chunk_meta_list_ != nullptr) { - void *buf = pa->alloc(sizeof(*chunk_meta_list_)); + void* buf = pa->alloc(sizeof(*chunk_meta_list_)); if (IS_NULL(buf)) { return common::E_OOM; } - chunk_meta_list_ = new (buf) common::SimpleList<ChunkMeta *>(pa); - common::SimpleList<ChunkMeta *>::Iterator it; + chunk_meta_list_ = new (buf) common::SimpleList<ChunkMeta*>(pa); + common::SimpleList<ChunkMeta*>::Iterator it; for (it = that.chunk_meta_list_->begin(); IS_SUCC(ret) && it != that.chunk_meta_list_->end(); it++) { - ChunkMeta *cm = it.get(); - void *cm_buf = pa->alloc(sizeof(ChunkMeta)); + ChunkMeta* cm = it.get(); + void* cm_buf = pa->alloc(sizeof(ChunkMeta)); if (IS_NULL(cm_buf)) { return common::E_OOM; } else { - ChunkMeta *my_cm = new (cm_buf) ChunkMeta; + ChunkMeta* my_cm = new (cm_buf) ChunkMeta; if (RET_FAIL(my_cm->clone_from(*cm, pa))) { } else if (RET_FAIL(chunk_meta_list_->push_back(my_cm))) { } @@ -519,8 +518,8 @@ class TimeseriesIndex : public ITimeseriesIndex { return ret; } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const TimeseriesIndex &tsi) { + friend std::ostream& operator<<(std::ostream& os, + const TimeseriesIndex& tsi) { os << "{meta_type=" << (int)tsi.timeseries_meta_type_ << ", chunk_meta_list_data_size=" << tsi.chunk_meta_list_data_size_ << ", measurement_name=" << tsi.measurement_name_ @@ -531,7 +530,7 @@ class TimeseriesIndex : public ITimeseriesIndex { if (tsi.chunk_meta_list_) { os << ", chunk_meta_list={"; int count = 0; - common::SimpleList<ChunkMeta *>::Iterator it = + common::SimpleList<ChunkMeta*>::Iterator it = tsi.chunk_meta_list_->begin(); for (; it != tsi.chunk_meta_list_->end(); it++, count++) { if (count != 0) { @@ -565,24 +564,24 @@ class TimeseriesIndex : public ITimeseriesIndex { * TimeseriesIndex.statistic_ is duplicated with ChunkMeta.statistic_. In * this case, we do not serialize ChunkMeta.statistic_. */ - Statistic *statistic_; + Statistic* statistic_; bool statistic_from_pa_; common::ByteStream chunk_meta_list_serialized_buf_; // common::PageArena page_arena_; - common::SimpleList<ChunkMeta *> *chunk_meta_list_; // for deserialize_from + common::SimpleList<ChunkMeta*>* chunk_meta_list_; // for deserialize_from }; class AlignedTimeseriesIndex : public ITimeseriesIndex { public: - TimeseriesIndex *time_ts_idx_; - TimeseriesIndex *value_ts_idx_; + TimeseriesIndex* time_ts_idx_; + TimeseriesIndex* value_ts_idx_; AlignedTimeseriesIndex() {} ~AlignedTimeseriesIndex() {} - virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const { + virtual common::SimpleList<ChunkMeta*>* get_time_chunk_meta_list() const { return time_ts_idx_->get_chunk_meta_list(); } - virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const { + virtual common::SimpleList<ChunkMeta*>* get_value_chunk_meta_list() const { return value_ts_idx_->get_chunk_meta_list(); } @@ -592,13 +591,13 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex { virtual common::TSDataType get_data_type() const { return time_ts_idx_->get_data_type(); } - virtual Statistic *get_statistic() const { + virtual Statistic* get_statistic() const { return value_ts_idx_->get_statistic(); } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const AlignedTimeseriesIndex &tsi) { + friend std::ostream& operator<<(std::ostream& os, + const AlignedTimeseriesIndex& tsi) { os << "time_ts_idx=" << *tsi.time_ts_idx_; os << ", value_ts_idx=" << *tsi.value_ts_idx_; return os; @@ -609,7 +608,7 @@ class AlignedTimeseriesIndex : public ITimeseriesIndex { class TSMIterator { public: explicit TSMIterator( - common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list) + common::SimpleList<ChunkGroupMeta*>& chunk_group_meta_list) : chunk_group_meta_list_(chunk_group_meta_list), chunk_group_meta_iter_(), chunk_meta_iter_() {} @@ -617,38 +616,38 @@ class TSMIterator { // sort => iterate int init(); bool has_next() const; - int get_next(std::shared_ptr<IDeviceID> &ret_device_id, - common::String &ret_measurement_name, - TimeseriesIndex &ret_ts_index); + int get_next(std::shared_ptr<IDeviceID>& ret_device_id, + common::String& ret_measurement_name, + TimeseriesIndex& ret_ts_index); private: - common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list_; - common::SimpleList<ChunkGroupMeta *>::Iterator chunk_group_meta_iter_; - common::SimpleList<ChunkMeta *>::Iterator chunk_meta_iter_; + common::SimpleList<ChunkGroupMeta*>& chunk_group_meta_list_; + common::SimpleList<ChunkGroupMeta*>::Iterator chunk_group_meta_iter_; + common::SimpleList<ChunkMeta*>::Iterator chunk_meta_iter_; // timeseries measurenemnt chunk meta info // map <device_name, <measurement_name, vector<chunk_meta>>> std::map<std::shared_ptr<IDeviceID>, - std::map<common::String, std::vector<ChunkMeta *>>> + std::map<common::String, std::vector<ChunkMeta*>>> tsm_chunk_meta_info_; // device iterator std::map<std::shared_ptr<IDeviceID>, - std::map<common::String, std::vector<ChunkMeta *>>>::iterator + std::map<common::String, std::vector<ChunkMeta*>>>::iterator tsm_device_iter_; // measurement iterator - std::map<common::String, std::vector<ChunkMeta *>>::iterator + std::map<common::String, std::vector<ChunkMeta*>>::iterator tsm_measurement_iter_; }; /* =============== TsFile Index ================ */ struct IComparable { virtual ~IComparable() = default; - virtual bool operator<(const IComparable &other) const = 0; - virtual bool operator>(const IComparable &other) const = 0; - virtual bool operator==(const IComparable &other) const = 0; - virtual int compare(const IComparable &other) { + virtual bool operator<(const IComparable& other) const = 0; + virtual bool operator>(const IComparable& other) const = 0; + virtual bool operator==(const IComparable& other) const = 0; + virtual int compare(const IComparable& other) { if (this->operator<(other)) { return -1; } else if (this->operator==(other)) { @@ -663,27 +662,27 @@ struct IComparable { struct DeviceIDComparable : IComparable { std::shared_ptr<IDeviceID> device_id_; - explicit DeviceIDComparable(const std::shared_ptr<IDeviceID> &device_id) + explicit DeviceIDComparable(const std::shared_ptr<IDeviceID>& device_id) : device_id_(device_id) {} - bool operator<(const IComparable &other) const override { - const auto *other_device = - dynamic_cast<const DeviceIDComparable *>(&other); + bool operator<(const IComparable& other) const override { + const auto* other_device = + dynamic_cast<const DeviceIDComparable*>(&other); if (!other_device) throw std::runtime_error("Incompatible comparison"); return *device_id_ < *other_device->device_id_; } - bool operator>(const IComparable &other) const override { - const auto *other_device = - dynamic_cast<const DeviceIDComparable *>(&other); + bool operator>(const IComparable& other) const override { + const auto* other_device = + dynamic_cast<const DeviceIDComparable*>(&other); if (!other_device) throw std::runtime_error("Incompatible comparison"); return *device_id_ != *other_device->device_id_ && !(*device_id_ < *other_device->device_id_); } - bool operator==(const IComparable &other) const override { - const auto *other_device = - dynamic_cast<const DeviceIDComparable *>(&other); + bool operator==(const IComparable& other) const override { + const auto* other_device = + dynamic_cast<const DeviceIDComparable*>(&other); if (!other_device) throw std::runtime_error("Incompatible comparison"); return *device_id_ == *other_device->device_id_; } @@ -696,25 +695,25 @@ struct DeviceIDComparable : IComparable { struct StringComparable : IComparable { std::string value_; - explicit StringComparable(const std::string &value) : value_(value) {} + explicit StringComparable(const std::string& value) : value_(value) {} - bool operator<(const IComparable &other) const override { - const auto *other_string = - dynamic_cast<const StringComparable *>(&other); + bool operator<(const IComparable& other) const override { + const auto* other_string = + dynamic_cast<const StringComparable*>(&other); if (!other_string) throw std::runtime_error("Incompatible comparison"); return value_ < other_string->value_; } - bool operator>(const IComparable &other) const override { - const auto *other_string = - dynamic_cast<const StringComparable *>(&other); + bool operator>(const IComparable& other) const override { + const auto* other_string = + dynamic_cast<const StringComparable*>(&other); if (!other_string) throw std::runtime_error("Incompatible comparison"); return value_ > other_string->value_; } - bool operator==(const IComparable &other) const override { - const auto *other_string = - dynamic_cast<const StringComparable *>(&other); + bool operator==(const IComparable& other) const override { + const auto* other_string = + dynamic_cast<const StringComparable*>(&other); if (!other_string) throw std::runtime_error("Incompatible comparison"); return value_ == other_string->value_; } @@ -723,7 +722,7 @@ struct StringComparable : IComparable { }; struct IMetaIndexEntry { - static void self_destructor(IMetaIndexEntry *ptr) { + static void self_destructor(IMetaIndexEntry* ptr) { if (ptr) { ptr->~IMetaIndexEntry(); } @@ -731,9 +730,9 @@ struct IMetaIndexEntry { IMetaIndexEntry() = default; virtual ~IMetaIndexEntry() = default; - virtual int serialize_to(common::ByteStream &out) { return common::E_OK; } - virtual int deserialize_from(common::ByteStream &out, - common::PageArena *pa) { + virtual int serialize_to(common::ByteStream& out) { return common::E_OK; } + virtual int deserialize_from(common::ByteStream& out, + common::PageArena* pa) { return common::E_NOT_SUPPORT; } virtual int64_t get_offset() const = 0; @@ -743,11 +742,11 @@ struct IMetaIndexEntry { } virtual common::String get_name() const { return {}; } virtual std::shared_ptr<IDeviceID> get_device_id() const { return nullptr; } - virtual std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) = 0; + virtual std::shared_ptr<IMetaIndexEntry> clone(common::PageArena* pa) = 0; #ifndef NDEBUG - virtual void print(std::ostream &os) const {} - friend std::ostream &operator<<(std::ostream &os, - const IMetaIndexEntry &entry) { + virtual void print(std::ostream& os) const {} + friend std::ostream& operator<<(std::ostream& os, + const IMetaIndexEntry& entry) { entry.print(os); return os; } @@ -760,19 +759,19 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { DeviceMetaIndexEntry() = default; - DeviceMetaIndexEntry(const std::shared_ptr<IDeviceID> &device_id, + DeviceMetaIndexEntry(const std::shared_ptr<IDeviceID>& device_id, const int64_t offset) : device_id_(device_id), offset_(offset) {} ~DeviceMetaIndexEntry() override = default; - static void self_deleter(DeviceMetaIndexEntry *ptr) { + static void self_deleter(DeviceMetaIndexEntry* ptr) { if (ptr) { ptr->~DeviceMetaIndexEntry(); } } - int serialize_to(common::ByteStream &out) override { + int serialize_to(common::ByteStream& out) override { int ret = common::E_OK; if (RET_FAIL(device_id_->serialize(out))) { } else if (RET_FAIL( @@ -781,10 +780,10 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { return ret; } - std::shared_ptr<IDeviceID> &get_device_id() { return device_id_; } + std::shared_ptr<IDeviceID>& get_device_id() { return device_id_; } - int deserialize_from(common::ByteStream &in, - common::PageArena *pa) override { + int deserialize_from(common::ByteStream& in, + common::PageArena* pa) override { int ret = common::E_OK; device_id_ = std::make_shared<StringArrayDeviceID>("init"); if (RET_FAIL(device_id_->deserialize(in))) { @@ -804,11 +803,11 @@ struct DeviceMetaIndexEntry : IMetaIndexEntry { std::shared_ptr<IDeviceID> get_device_id() const override { return device_id_; } - std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) override { + std::shared_ptr<IMetaIndexEntry> clone(common::PageArena* pa) override { return std::make_shared<DeviceMetaIndexEntry>(device_id_, offset_); } #ifndef NDEBUG - void print(std::ostream &os) const override { + void print(std::ostream& os) const override { os << "name=" << device_id_ << ", offset=" << offset_; } #endif @@ -821,19 +820,19 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { ~MeasurementMetaIndexEntry() override = default; MeasurementMetaIndexEntry() = default; - MeasurementMetaIndexEntry(const common::String &name, const int64_t offset, - common::PageArena &pa) { + MeasurementMetaIndexEntry(const common::String& name, const int64_t offset, + common::PageArena& pa) { offset_ = offset; name_.dup_from(name, pa); } - FORCE_INLINE int init(const std::string &str, const int64_t offset, - common::PageArena &pa) { + FORCE_INLINE int init(const std::string& str, const int64_t offset, + common::PageArena& pa) { offset_ = offset; return name_.dup_from(str, pa); } - int serialize_to(common::ByteStream &out) override { + int serialize_to(common::ByteStream& out) override { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::write_mystring(name_, out))) { } else if (RET_FAIL( @@ -842,8 +841,8 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { return ret; } - int deserialize_from(common::ByteStream &in, - common::PageArena *pa) override { + int deserialize_from(common::ByteStream& in, + common::PageArena* pa) override { int ret = common::E_OK; if (RET_FAIL(common::SerializationUtil::read_mystring(name_, pa, in))) { } else if (RET_FAIL(common::SerializationUtil::read_i64(offset_, in))) { @@ -863,11 +862,11 @@ struct MeasurementMetaIndexEntry : IMetaIndexEntry { std::shared_ptr<IDeviceID> get_device_id() const override { return nullptr; } - std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) override { + std::shared_ptr<IMetaIndexEntry> clone(common::PageArena* pa) override { return std::make_shared<MeasurementMetaIndexEntry>(name_, offset_, *pa); } #ifndef NDEBUG - void print(std::ostream &os) const override { + void print(std::ostream& os) const override { os << "name=" << name_ << ", offset=" << offset_; } #endif @@ -881,7 +880,7 @@ enum MetaIndexNodeType { INVALID_META_NODE_TYPE = 4, }; #ifndef NDEBUG -static const char *meta_index_node_type_names[5] = { +static const char* meta_index_node_type_names[5] = { "INTERNAL_DEVICE", "LEAF_DEVICE", "INTERNAL_MEASUREMENT", "LEAF_MEASUREMENT", "INVALID_META_NODE_TYPE"}; #endif @@ -892,9 +891,9 @@ struct MetaIndexNode { std::vector<std::shared_ptr<IMetaIndexEntry>> children_; int64_t end_offset_; MetaIndexNodeType node_type_; - common::PageArena *pa_; + common::PageArena* pa_; - explicit MetaIndexNode(common::PageArena *pa) + explicit MetaIndexNode(common::PageArena* pa) : children_(), end_offset_(0), node_type_(), pa_(pa) {} std::shared_ptr<IMetaIndexEntry> peek() { @@ -906,7 +905,7 @@ struct MetaIndexNode { ~MetaIndexNode() {} - static void self_deleter(MetaIndexNode *ptr) { + static void self_deleter(MetaIndexNode* ptr) { if (ptr) { ptr->~MetaIndexNode(); } @@ -914,10 +913,10 @@ struct MetaIndexNode { int binary_search_children( std::shared_ptr<IComparable> key, bool exact_search, - std::shared_ptr<IMetaIndexEntry> &ret_index_entry, - int64_t &ret_end_offset); + std::shared_ptr<IMetaIndexEntry>& ret_index_entry, + int64_t& ret_end_offset); - int serialize_to(common::ByteStream &out) { + int serialize_to(common::ByteStream& out) { int ret = common::E_OK; #if DEBUG_SE int64_t start_pos = out.total_size(); @@ -946,12 +945,12 @@ struct MetaIndexNode { return ret; } - int deserialize_from(const char *buf, int len) { + int deserialize_from(const char* buf, int len) { common::ByteStream bs; bs.wrap_from(buf, len); return deserialize_from(bs); } - int deserialize_from(common::ByteStream &in) { + int deserialize_from(common::ByteStream& in) { int ret = common::E_OK; uint32_t children_size = 0; if (RET_FAIL( @@ -959,7 +958,7 @@ struct MetaIndexNode { return ret; } for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { - void *entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); + void* entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); if (IS_NULL(entry_buf)) { return common::E_OOM; } @@ -987,12 +986,12 @@ struct MetaIndexNode { #endif return ret; } - int device_deserialize_from(const char *buf, int len) { + int device_deserialize_from(const char* buf, int len) { common::ByteStream bs; bs.wrap_from(buf, len); return device_deserialize_from(bs); } - int device_deserialize_from(common::ByteStream &in) { + int device_deserialize_from(common::ByteStream& in) { int ret = common::E_OK; uint32_t children_size = 0; if (RET_FAIL( @@ -1000,11 +999,11 @@ struct MetaIndexNode { return ret; } for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { - void *entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); + void* entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); if (IS_NULL(entry_buf)) { return common::E_OOM; } - auto *entry_ptr = new (entry_buf) DeviceMetaIndexEntry(); + auto* entry_ptr = new (entry_buf) DeviceMetaIndexEntry(); auto entry = std::shared_ptr<DeviceMetaIndexEntry>( entry_ptr, DeviceMetaIndexEntry::self_deleter); if (RET_FAIL(entry->deserialize_from(in, pa_))) { @@ -1030,8 +1029,8 @@ struct MetaIndexNode { } #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const MetaIndexNode &node) { + friend std::ostream& operator<<(std::ostream& os, + const MetaIndexNode& node) { os << "end_offset=" << node.end_offset_ << ", node_type=" << meta_index_node_type_names[node.node_type_]; @@ -1073,16 +1072,16 @@ struct TsFileMeta { DeviceNodeMap; std::map<std::string, std::shared_ptr<MetaIndexNode>> table_metadata_index_node_map_; - std::unordered_map<std::string, std::string *> tsfile_properties_; + std::unordered_map<std::string, std::string*> tsfile_properties_; typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>> TableSchemasMap; TableSchemasMap table_schemas_; int64_t meta_offset_; - BloomFilter *bloom_filter_; - common::PageArena *page_arena_; + BloomFilter* bloom_filter_; + common::PageArena* page_arena_; - int get_table_metaindex_node(const std::string &table_name, - MetaIndexNode *&ret_node) { + int get_table_metaindex_node(const std::string& table_name, + MetaIndexNode*& ret_node) { std::map<std::string, std::shared_ptr<MetaIndexNode>>::iterator it = table_metadata_index_node_map_.find(table_name); if (it == table_metadata_index_node_map_.end()) { @@ -1092,8 +1091,8 @@ struct TsFileMeta { return common::E_OK; } - int get_table_schema(const std::string &table_name, - std::shared_ptr<TableSchema> &ret_schema) { + int get_table_schema(const std::string& table_name, + std::shared_ptr<TableSchema>& ret_schema) { TableSchemasMap::iterator it = table_schemas_.find(table_name); if (it == table_schemas_.end()) { return common::E_TABLE_NOT_EXIST; @@ -1105,7 +1104,7 @@ struct TsFileMeta { TsFileMeta() : meta_offset_(0), bloom_filter_(nullptr), page_arena_(nullptr) {} - explicit TsFileMeta(common::PageArena *pa) + explicit TsFileMeta(common::PageArena* pa) : meta_offset_(0), bloom_filter_(nullptr), page_arena_(pa) {} ~TsFileMeta() { if (bloom_filter_ != nullptr) { @@ -1114,19 +1113,21 @@ struct TsFileMeta { for (auto properties : tsfile_properties_) { if (properties.second != nullptr) { delete properties.second; + properties.second = nullptr; } } + tsfile_properties_.clear(); table_metadata_index_node_map_.clear(); table_schemas_.clear(); } - int serialize_to(common::ByteStream &out); + int serialize_to(common::ByteStream& out); - int deserialize_from(common::ByteStream &in); + int deserialize_from(common::ByteStream& in); #ifndef NDEBUG - friend std::ostream &operator<<(std::ostream &os, - const TsFileMeta &tsfile_meta) { + friend std::ostream& operator<<(std::ostream& os, + const TsFileMeta& tsfile_meta) { os << "meta_offset=" << tsfile_meta.meta_offset_; return os; } diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index 273f09a4..92c05c92 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -24,7 +24,7 @@ using namespace common; namespace storage { -int TsFileIOReader::init(const std::string &file_path) { +int TsFileIOReader::init(const std::string& file_path) { int ret = E_OK; read_file_ = new ReadFile; read_file_created_ = true; @@ -33,7 +33,7 @@ int TsFileIOReader::init(const std::string &file_path) { return ret; } -int TsFileIOReader::init(ReadFile *read_file) { +int TsFileIOReader::init(ReadFile* read_file) { if (IS_NULL(read_file)) { ASSERT(false); return E_INVALID_ARG; @@ -56,9 +56,9 @@ void TsFileIOReader::reset() { } int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> device_id, - const std::string &measurement_name, - TsFileSeriesScanIterator *&ssi, - common::PageArena &pa, Filter *time_filter) { + const std::string& measurement_name, + TsFileSeriesScanIterator*& ssi, + common::PageArena& pa, Filter* time_filter) { int ret = E_OK; if (RET_FAIL(load_tsfile_meta_if_necessary())) { } else { @@ -80,7 +80,7 @@ int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> device_id, return ret; } -void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) { +void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator* ssi) { if (ssi != nullptr) { ssi->destroy(); delete ssi; @@ -89,12 +89,12 @@ void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) { int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( std::shared_ptr<IDeviceID> device_id, - std::vector<ITimeseriesIndex *> ×eries_indexs, PageArena &pa) { + std::vector<ITimeseriesIndex*>& timeseries_indexs, PageArena& pa) { int ret = E_OK; load_tsfile_meta_if_necessary(); std::shared_ptr<IMetaIndexEntry> meta_index_entry; int64_t end_offset; - std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > + std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>> meta_index_entry_list; if (RET_FAIL(load_device_index_entry( std::make_shared<DeviceIDComparable>(device_id), meta_index_entry, @@ -108,8 +108,8 @@ int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( return ret; } -bool TsFileIOReader::filter_stasify(ITimeseriesIndex *ts_index, - Filter *time_filter) { +bool TsFileIOReader::filter_stasify(ITimeseriesIndex* ts_index, + Filter* time_filter) { ASSERT(ts_index->get_statistic() != nullptr); return time_filter->satisfy(ts_index->get_statistic()); } @@ -141,7 +141,7 @@ int TsFileIOReader::load_tsfile_meta() { // Step 1: reader the tsfile_meta_size // 1.1 prepare reader buffer int32_t alloc_size = UTIL_MIN(TSFILE_READ_IO_SIZE, file_size()); - char *read_buf = (char *)mem_alloc(alloc_size, MOD_TSFILE_READER); + char* read_buf = (char*)mem_alloc(alloc_size, MOD_TSFILE_READER); if (IS_NULL(read_buf)) { return E_OOM; } @@ -159,7 +159,7 @@ int TsFileIOReader::load_tsfile_meta() { // 1.3 deserialize tsfile_meta_size if (IS_SUCC(ret)) { // deserialize tsfile_meta_size - char *size_buf = read_buf + alloc_size - TAIL_MAGIC_AND_META_SIZE_SIZE; + char* size_buf = read_buf + alloc_size - TAIL_MAGIC_AND_META_SIZE_SIZE; tsfile_meta_size = SerializationUtil::read_ui32(size_buf); ASSERT(tsfile_meta_size > 0 && tsfile_meta_size <= (1ll << 20)); } @@ -167,12 +167,12 @@ int TsFileIOReader::load_tsfile_meta() { // Step 2: reader TsFileMeta if (IS_SUCC(ret)) { // 2.1 prepare enough buffer (use the previous buffer if can). - char *tsfile_meta_buf = nullptr; + char* tsfile_meta_buf = nullptr; if (tsfile_meta_size + TAIL_MAGIC_AND_META_SIZE_SIZE > (uint32_t)alloc_size) { // prepare buffer to re-reader from start of tsfile_meta - char *old_read_buf = read_buf; - read_buf = (char *)mem_realloc(read_buf, tsfile_meta_size); + char* old_read_buf = read_buf; + read_buf = (char*)mem_realloc(read_buf, tsfile_meta_size); if (IS_NULL(read_buf)) { read_buf = old_read_buf; ret = E_OOM; @@ -211,8 +211,8 @@ int TsFileIOReader::load_tsfile_meta() { } int TsFileIOReader::load_timeseries_index_for_ssi( - std::shared_ptr<IDeviceID> device_id, const std::string &measurement_name, - TsFileSeriesScanIterator *&ssi) { + std::shared_ptr<IDeviceID> device_id, const std::string& measurement_name, + TsFileSeriesScanIterator*& ssi) { int ret = E_OK; std::shared_ptr<IMetaIndexEntry> device_index_entry; int64_t device_ie_end_offset = 0; @@ -224,19 +224,19 @@ int TsFileIOReader::load_timeseries_index_for_ssi( device_ie_end_offset))) { return ret; } - auto &pa = ssi->timeseries_index_pa_; + auto& pa = ssi->timeseries_index_pa_; int start_offset = device_index_entry->get_offset(), end_offset = device_ie_end_offset; ASSERT(start_offset < end_offset); const int32_t read_size = end_offset - start_offset; int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } - auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr, MetaIndexNode::self_deleter); @@ -248,7 +248,7 @@ int TsFileIOReader::load_timeseries_index_for_ssi( } bool is_aligned = is_aligned_device(top_node); - TimeseriesIndex *timeseries_index = nullptr; + TimeseriesIndex* timeseries_index = nullptr; if (is_aligned) { if (RET_FAIL( get_time_column_metadata(top_node, timeseries_index, pa))) { @@ -267,8 +267,8 @@ int TsFileIOReader::load_timeseries_index_for_ssi( return ret; } if (is_aligned) { - auto *aligned_timeseries_index = - dynamic_cast<AlignedTimeseriesIndex *>(ssi->itimeseries_index_); + auto* aligned_timeseries_index = + dynamic_cast<AlignedTimeseriesIndex*>(ssi->itimeseries_index_); if (aligned_timeseries_index) { aligned_timeseries_index->time_ts_idx_ = timeseries_index; } @@ -277,10 +277,10 @@ int TsFileIOReader::load_timeseries_index_for_ssi( #if DEBUG_SE if (measurement_index_entry.name_.len_) { std::cout << "load timeseries index: " - << *((TimeseriesIndex *)ssi->itimeseries_index_) << std::endl; + << *((TimeseriesIndex*)ssi->itimeseries_index_) << std::endl; } else { std::cout << "load aligned timeseries index: " - << *((AlignedTimeseriesIndex *)ssi->itimeseries_index_) + << *((AlignedTimeseriesIndex*)ssi->itimeseries_index_) << std::endl; } #endif @@ -289,7 +289,7 @@ int TsFileIOReader::load_timeseries_index_for_ssi( int TsFileIOReader::load_device_index_entry( std::shared_ptr<IComparable> device_name, - std::shared_ptr<IMetaIndexEntry> &device_index_entry, int64_t &end_offset) { + std::shared_ptr<IMetaIndexEntry>& device_index_entry, int64_t& end_offset) { int ret = E_OK; std::shared_ptr<DeviceIDComparable> device_id_comparable = std::dynamic_pointer_cast<DeviceIDComparable>(device_name); @@ -322,10 +322,10 @@ int TsFileIOReader::load_device_index_entry( } int TsFileIOReader::load_measurement_index_entry( - const std::string &measurement_name_str, + const std::string& measurement_name_str, std::shared_ptr<MetaIndexNode> top_node, - std::shared_ptr<IMetaIndexEntry> &ret_measurement_index_entry, - int64_t &ret_end_offset) { + std::shared_ptr<IMetaIndexEntry>& ret_measurement_index_entry, + int64_t& ret_end_offset) { int ret = E_OK; // search from top_node in top-down way auto measurement_name = @@ -346,9 +346,9 @@ int TsFileIOReader::load_measurement_index_entry( } int TsFileIOReader::load_all_measurement_index_entry( - int64_t start_offset, int64_t end_offset, common::PageArena &pa, - std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > - &ret_measurement_index_entry) { + int64_t start_offset, int64_t end_offset, common::PageArena& pa, + std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>>& + ret_measurement_index_entry) { #if DEBUG_SE std::cout << "load_measurement_index_entry: measurement_name_str= " << ", start_offset=" << start_offset @@ -359,12 +359,12 @@ int TsFileIOReader::load_all_measurement_index_entry( // 1. load top measuremnt_index_node const int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } - auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr, MetaIndexNode::self_deleter); if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, @@ -389,15 +389,15 @@ int TsFileIOReader::load_all_measurement_index_entry( int TsFileIOReader::read_device_meta_index(int32_t start_offset, int32_t end_offset, - common::PageArena &pa, - MetaIndexNode *&device_meta_index, + common::PageArena& pa, + MetaIndexNode*& device_meta_index, bool leaf) { int ret = E_OK; ASSERT(start_offset < end_offset); const int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } @@ -415,8 +415,8 @@ int TsFileIOReader::read_device_meta_index(int32_t start_offset, int TsFileIOReader::get_timeseries_indexes( std::shared_ptr<IDeviceID> device_id, - const std::unordered_set<std::string> &measurement_names, - std::vector<ITimeseriesIndex *> ×eries_indexs, common::PageArena &pa) { + const std::unordered_set<std::string>& measurement_names, + std::vector<ITimeseriesIndex*>& timeseries_indexs, common::PageArena& pa) { int ret = E_OK; std::shared_ptr<IMetaIndexEntry> device_index_entry; int64_t device_ie_end_offset = 0; @@ -433,12 +433,12 @@ int TsFileIOReader::get_timeseries_indexes( ASSERT(start_offset < end_offset); const int32_t read_size = end_offset - start_offset; int32_t ret_read_len = 0; - char *data_buf = (char *)pa.alloc(read_size); - void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)pa.alloc(read_size); + void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { return E_OOM; } - auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); + auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr, MetaIndexNode::self_deleter); @@ -450,13 +450,13 @@ int TsFileIOReader::get_timeseries_indexes( } bool is_aligned = is_aligned_device(top_node); - TimeseriesIndex *timeseries_index = nullptr; + TimeseriesIndex* timeseries_index = nullptr; if (is_aligned) { get_time_column_metadata(top_node, timeseries_index, pa); } int64_t idx = 0; - for (const auto &measurement_name : measurement_names) { + for (const auto& measurement_name : measurement_names) { if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node, measurement_index_entry, measurement_ie_end_offset))) { @@ -466,8 +466,8 @@ int TsFileIOReader::get_timeseries_indexes( is_aligned))) { } if (is_aligned) { - AlignedTimeseriesIndex *aligned_timeseries_index = - dynamic_cast<AlignedTimeseriesIndex *>(timeseries_indexs[idx]); + AlignedTimeseriesIndex* aligned_timeseries_index = + dynamic_cast<AlignedTimeseriesIndex*>(timeseries_indexs[idx]); if (aligned_timeseries_index) { aligned_timeseries_index->time_ts_idx_ = timeseries_index; } @@ -485,8 +485,8 @@ int TsFileIOReader::get_timeseries_indexes( int TsFileIOReader::search_from_leaf_node( std::shared_ptr<IComparable> target_name, std::shared_ptr<MetaIndexNode> index_node, - std::shared_ptr<IMetaIndexEntry> &ret_index_entry, - int64_t &ret_end_offset) { + std::shared_ptr<IMetaIndexEntry>& ret_index_entry, + int64_t& ret_end_offset) { int ret = E_OK; ret = index_node->binary_search_children(target_name, true, ret_index_entry, ret_end_offset); @@ -496,8 +496,8 @@ int TsFileIOReader::search_from_leaf_node( int TsFileIOReader::search_from_internal_node( std::shared_ptr<IComparable> target_name, bool is_device, std::shared_ptr<MetaIndexNode> index_node, - std::shared_ptr<IMetaIndexEntry> &ret_index_entry, - int64_t &ret_end_offset) { + std::shared_ptr<IMetaIndexEntry>& ret_index_entry, + int64_t& ret_end_offset) { int ret = E_OK; std::shared_ptr<IMetaIndexEntry> index_entry; int64_t end_offset = 0; @@ -519,12 +519,12 @@ int TsFileIOReader::search_from_internal_node( #endif ASSERT(read_size > 0 && read_size < (1 << 30)); PageArena cur_level_index_node_pa; - void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); - char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); + void* buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)cur_level_index_node_pa.alloc(read_size); if (IS_NULL(buf) || IS_NULL(data_buf)) { return E_OOM; } - MetaIndexNode *cur_level_index_node = + MetaIndexNode* cur_level_index_node = new (buf) MetaIndexNode(&cur_level_index_node_pa); int32_t ret_read_len = 0; if (RET_FAIL(read_file_->read(index_entry->get_offset(), data_buf, @@ -569,12 +569,12 @@ bool TsFileIOReader::is_aligned_device( int TsFileIOReader::get_time_column_metadata( std::shared_ptr<MetaIndexNode> measurement_node, - TimeseriesIndex *&ret_timeseries_index, PageArena &pa) { + TimeseriesIndex*& ret_timeseries_index, PageArena& pa) { int ret = E_OK; if (!is_aligned_device(measurement_node)) { return ret; } - char *ti_buf = nullptr; + char* ti_buf = nullptr; int start_idx = 0, end_idx = 0; int ret_read_len = 0; if (measurement_node->node_type_ == LEAF_MEASUREMENT) { @@ -597,7 +597,7 @@ int TsFileIOReader::get_time_column_metadata( } } buffer.wrap_from(ti_buf, end_idx - start_idx); - void *buf = pa.alloc(sizeof(TimeseriesIndex)); + void* buf = pa.alloc(sizeof(TimeseriesIndex)); if (IS_NULL(buf)) { return E_OOM; } @@ -621,14 +621,14 @@ int TsFileIOReader::get_time_column_metadata( } int TsFileIOReader::do_load_timeseries_index( - const std::string &measurement_name_str, int64_t start_offset, - int64_t end_offset, PageArena &in_timeseries_index_pa, - ITimeseriesIndex *&ret_timeseries_index, bool is_aligned) { + const std::string& measurement_name_str, int64_t start_offset, + int64_t end_offset, PageArena& in_timeseries_index_pa, + ITimeseriesIndex*& ret_timeseries_index, bool is_aligned) { ASSERT(end_offset > start_offset); int ret = E_OK; int32_t read_size = (int32_t)(end_offset - start_offset); int32_t ret_read_len = 0; - char *ti_buf = (char *)mem_alloc(read_size, MOD_TSFILE_READER); + char* ti_buf = (char*)mem_alloc(read_size, MOD_TSFILE_READER); if (IS_NULL(ti_buf)) { return E_OOM; } @@ -638,7 +638,7 @@ int TsFileIOReader::do_load_timeseries_index( ByteStream bs; bs.wrap_from(ti_buf, read_size); const String target_measurement_name( - (char *)measurement_name_str.c_str(), + (char*)measurement_name_str.c_str(), strlen(measurement_name_str.c_str())); bool found = false; #if DEBUG_SE @@ -654,12 +654,12 @@ int TsFileIOReader::do_load_timeseries_index( } else if (is_aligned && cur_timeseries_index.get_measurement_name().equal_to( target_measurement_name)) { - void *buf = in_timeseries_index_pa.alloc( + void* buf = in_timeseries_index_pa.alloc( sizeof(AlignedTimeseriesIndex)); if (IS_NULL(buf)) { return E_OOM; } - AlignedTimeseriesIndex *aligned_ts_idx = + AlignedTimeseriesIndex* aligned_ts_idx = new (buf) AlignedTimeseriesIndex; buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); if (IS_NULL(buf)) { @@ -674,7 +674,7 @@ int TsFileIOReader::do_load_timeseries_index( } else if (!is_aligned && cur_timeseries_index.get_measurement_name().equal_to( target_measurement_name)) { - void *buf = + void* buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); auto ts_idx = new (buf) TimeseriesIndex; ts_idx->clone_from(cur_timeseries_index, @@ -693,12 +693,12 @@ int TsFileIOReader::do_load_timeseries_index( } int TsFileIOReader::do_load_all_timeseries_index( - std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > - &index_node_entry_list, - common::PageArena &in_timeseries_index_pa, - std::vector<ITimeseriesIndex *> &ts_indexs) { + std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>>& + index_node_entry_list, + common::PageArena& in_timeseries_index_pa, + std::vector<ITimeseriesIndex*>& ts_indexs) { int ret = E_OK; - for (const auto &index_node_entry : index_node_entry_list) { + for (const auto& index_node_entry : index_node_entry_list) { int64_t start_offset = index_node_entry.first->get_offset(), end_offset = index_node_entry.second; int32_t read_size = (int32_t)(end_offset - start_offset); @@ -729,8 +729,8 @@ int TsFileIOReader::do_load_all_timeseries_index( int TsFileIOReader::get_all_leaf( std::shared_ptr<MetaIndexNode> index_node, - std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > - &index_node_entry_list) { + std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t>>& + index_node_entry_list) { int ret = E_OK; if (index_node->node_type_ == LEAF_MEASUREMENT || index_node->node_type_ == LEAF_DEVICE) { @@ -760,12 +760,12 @@ int TsFileIOReader::get_all_leaf( #endif ASSERT(read_size > 0 && read_size < (1 << 30)); PageArena cur_level_index_node_pa; - void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); - char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); + void* buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); + char* data_buf = (char*)cur_level_index_node_pa.alloc(read_size); if (IS_NULL(buf) || IS_NULL(data_buf)) { return E_OOM; } - auto *cur_level_index_node_ptr = + auto* cur_level_index_node_ptr = new (buf) MetaIndexNode(&cur_level_index_node_pa); auto cur_level_index_node = std::shared_ptr<MetaIndexNode>( cur_level_index_node_ptr, MetaIndexNode::self_deleter); diff --git a/cpp/src/reader/block/single_device_tsblock_reader.cc b/cpp/src/reader/block/single_device_tsblock_reader.cc index 1df563cd..4137006e 100644 --- a/cpp/src/reader/block/single_device_tsblock_reader.cc +++ b/cpp/src/reader/block/single_device_tsblock_reader.cc @@ -214,8 +214,14 @@ int SingleDeviceTsBlockReader::fill_ids() { const auto& id_column_context = entry.second; for (int32_t pos : id_column_context.pos_in_result_) { std::string* device_tag = nullptr; - device_tag = device_query_task_->get_device_id()->get_segments().at( - id_column_context.pos_in_device_id_); + const auto& segments = + device_query_task_->get_device_id()->get_segments(); + int32_t pos_in_device_id = id_column_context.pos_in_device_id_; + if (pos_in_device_id >= 0 && + static_cast<size_t>(pos_in_device_id - 1) < segments.size()) { + device_tag = segments[pos_in_device_id - 1]; + } + if (device_tag == nullptr) { ret = col_appenders_[pos + 1]->fill_null( current_block_->get_row_count()); diff --git a/cpp/src/reader/device_meta_iterator.cc b/cpp/src/reader/device_meta_iterator.cc index 4f47341c..642afdbc 100644 --- a/cpp/src/reader/device_meta_iterator.cc +++ b/cpp/src/reader/device_meta_iterator.cc @@ -43,7 +43,7 @@ int DeviceMetaIterator::next( } int DeviceMetaIterator::load_results() { - bool is_root_idx_node = true; + int root_num = meta_index_nodes_.size(); while (!meta_index_nodes_.empty()) { // To avoid ASan overflow. // using `const auto&` creates a reference @@ -58,12 +58,9 @@ int DeviceMetaIterator::load_results() { } else { return common::E_INVALID_NODE_TYPE; } - // The first MetaIndexNode is the root and is not loaded here, so no - // need to destruct it here. - if (!is_root_idx_node) { + if (root_num-- <= 0) { meta_data_index_node->~MetaIndexNode(); } - is_root_idx_node = false; } return common::E_OK; diff --git a/cpp/src/reader/device_meta_iterator.h b/cpp/src/reader/device_meta_iterator.h index 55f20913..8b39006b 100644 --- a/cpp/src/reader/device_meta_iterator.h +++ b/cpp/src/reader/device_meta_iterator.h @@ -37,6 +37,16 @@ class DeviceMetaIterator { pa_.init(512, common::MOD_DEVICE_META_ITER); } + DeviceMetaIterator(TsFileIOReader *io_reader, + const std::vector<MetaIndexNode *> &meta_index_node_list, + const Filter *id_filter) + : io_reader_(io_reader), id_filter_(id_filter) { + for (auto meta_index_node : meta_index_node_list) { + meta_index_nodes_.push(meta_index_node); + } + pa_.init(512, common::MOD_DEVICE_META_ITER); + } + ~DeviceMetaIterator() { pa_.destroy(); } bool has_next(); diff --git a/cpp/src/reader/imeta_data_querier.h b/cpp/src/reader/imeta_data_querier.h index 73a005e8..c034f915 100644 --- a/cpp/src/reader/imeta_data_querier.h +++ b/cpp/src/reader/imeta_data_querier.h @@ -57,7 +57,10 @@ class IMetadataQuerier { virtual std::unique_ptr<DeviceMetaIterator> device_iterator( MetaIndexNode* root, const Filter* id_filter) = 0; -}; + // FIXME(Colin): refine this. + virtual std::unique_ptr<DeviceMetaIterator> device_iterator( + std::vector<MetaIndexNode*> root, const Filter* id_filter) = 0; +}; } // end namespace storage #endif // READER_IMETA_DATA_QUERIER_H diff --git a/cpp/src/reader/meta_data_querier.cc b/cpp/src/reader/meta_data_querier.cc index 5a32b922..0accbdde 100644 --- a/cpp/src/reader/meta_data_querier.cc +++ b/cpp/src/reader/meta_data_querier.cc @@ -98,6 +98,12 @@ std::unique_ptr<DeviceMetaIterator> MetadataQuerier::device_iterator( new DeviceMetaIterator(io_reader_, root, id_filter)); } +std::unique_ptr<DeviceMetaIterator> MetadataQuerier::device_iterator( + std::vector<MetaIndexNode*> root, const Filter* id_filter) { + return std::unique_ptr<DeviceMetaIterator>( + new DeviceMetaIterator(io_reader_, root, id_filter)); +} + int MetadataQuerier::load_chunk_meta( const std::pair<IDeviceID, std::string>& key, std::vector<ChunkMeta*>& chunk_meta_list) { diff --git a/cpp/src/reader/meta_data_querier.h b/cpp/src/reader/meta_data_querier.h index b4eed350..525ecf86 100644 --- a/cpp/src/reader/meta_data_querier.h +++ b/cpp/src/reader/meta_data_querier.h @@ -61,6 +61,9 @@ class MetadataQuerier : public IMetadataQuerier { std::unique_ptr<DeviceMetaIterator> device_iterator( MetaIndexNode* root, const Filter* id_filter) override; + std::unique_ptr<DeviceMetaIterator> device_iterator( + std::vector<MetaIndexNode*> root, const Filter* id_filter) override; + void clear() override; private: diff --git a/cpp/src/reader/table_query_executor.cc b/cpp/src/reader/table_query_executor.cc index d09a5c90..6fbd4f0f 100644 --- a/cpp/src/reader/table_query_executor.cc +++ b/cpp/src/reader/table_query_executor.cc @@ -19,17 +19,19 @@ #include "reader/table_query_executor.h" +#include "utils/db_utils.h" + namespace storage { -int TableQueryExecutor::query(const std::string &table_name, - const std::vector<std::string> &columns, - Filter *time_filter, Filter *id_filter, - Filter *field_filter, ResultSet *&ret_qds) { +int TableQueryExecutor::query(const std::string& table_name, + const std::vector<std::string>& columns, + Filter* time_filter, Filter* id_filter, + Filter* field_filter, ResultSet*& ret_qds) { int ret = common::E_OK; - TsFileMeta *file_metadata = nullptr; + TsFileMeta* file_metadata = nullptr; file_metadata = tsfile_io_reader_->get_tsfile_meta(); common::PageArena pa; pa.init(512, common::MOD_TSFILE_READER); - MetaIndexNode *table_root = nullptr; + MetaIndexNode* table_root = nullptr; std::shared_ptr<TableSchema> table_schema; if (RET_FAIL( file_metadata->get_table_metaindex_node(table_name, table_root))) { @@ -42,7 +44,7 @@ int TableQueryExecutor::query(const std::string &table_name, return ret; } std::vector<std::string> lower_case_column_names(columns); - for (auto &column : lower_case_column_names) { + for (auto& column : lower_case_column_names) { to_lowercase_inplace(column); } std::shared_ptr<ColumnMapping> column_mapping = @@ -85,6 +87,113 @@ int TableQueryExecutor::query(const std::string &table_name, return ret; } -void TableQueryExecutor::destroy_query_data_set(ResultSet *qds) { delete qds; } +int TableQueryExecutor::query_on_tree( + const std::vector<std::shared_ptr<IDeviceID>>& devices, + const std::vector<std::string>& tag_columns, + const std::vector<std::string>& field_columns, Filter* time_filter, + ResultSet*& ret_qds) { + common::PageArena pa; + pa.init(512, common::MOD_TSFILE_READER); + int ret = common::E_OK; + TsFileMeta* file_meta = tsfile_io_reader_->get_tsfile_meta(); + std::vector<MetaIndexNode*> table_inodes; + for (auto const& device : devices) { + MetaIndexNode* table_inode; + if (RET_FAIL(file_meta->get_table_metaindex_node( + device->get_table_name(), table_inode))) { + }; + table_inodes.push_back(table_inode); + } + + std::vector<common::ColumnSchema> col_schema; + for (auto const& tag : tag_columns) { + col_schema.emplace_back(tag, common::TSDataType::STRING, + common::ColumnCategory::TAG); + } + + std::unordered_map<std::string, common::TSDataType> column_types_map; + + for (auto const& device : devices) { + bool all_collected = true; + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) == column_types_map.end()) { + all_collected = false; + break; + } + } + if (all_collected) { + break; + } + + std::unordered_set<std::string> measurements(field_columns.begin(), + field_columns.end()); + std::vector<ITimeseriesIndex*> index(measurements.size()); + if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes( + device, measurements, index, pa))) { + assert(0); + } + + for (auto* ts_index : index) { + if (ts_index != nullptr) { + std::string measurement_name = + ts_index->get_measurement_name().to_std_string(); + if (column_types_map.find(measurement_name) == + column_types_map.end()) { + common::TSDataType type = ts_index->get_data_type(); + if (type == common::TSDataType::INT32 || + type == common::TSDataType::INT64 || + type == common::TSDataType::TIMESTAMP || + type == common::TSDataType::DATE) { + type = common::TSDataType::INT64; + } else if (type == common::TSDataType::FLOAT) { + type = common::TSDataType::DOUBLE; + } + column_types_map[measurement_name] = type; + } + } + } + } + + for (const auto& field_col : field_columns) { + if (column_types_map.find(field_col) != column_types_map.end()) { + col_schema.emplace_back(field_col, column_types_map[field_col], + common::ColumnCategory::FIELD); + } else { + col_schema.emplace_back(field_col, + common::TSDataType::INVALID_DATATYPE, + common::ColumnCategory::FIELD); + } + } + + auto schema = std::make_shared<TableSchema>("default", col_schema); + std::shared_ptr<ColumnMapping> column_mapping = + std::make_shared<ColumnMapping>(); + for (size_t i = 0; i < col_schema.size(); ++i) { + column_mapping->add(col_schema[i].column_name_, i, *schema); + } + std::vector<common::TSDataType> datatypes = schema->get_data_types(); + auto device_task_iterator = + std::unique_ptr<DeviceTaskIterator>(new DeviceTaskIterator( + schema->get_measurement_names(), table_inodes, column_mapping, + meta_data_querier_, nullptr, schema)); + std::unique_ptr<TsBlockReader> tsblock_reader; + switch (table_query_ordering_) { + case TableQueryOrdering::DEVICE: + tsblock_reader = std::unique_ptr<DeviceOrderedTsBlockReader>( + new DeviceOrderedTsBlockReader( + std::move(device_task_iterator), meta_data_querier_, + block_size_, tsfile_io_reader_, time_filter, nullptr)); + break; + case TableQueryOrdering::TIME: + default: + ret = common::E_UNSUPPORTED_ORDER; + } + assert(tsblock_reader != nullptr); + ret_qds = new TableResultSet(std::move(tsblock_reader), + schema->get_measurement_names(), + schema->get_data_types()); + return ret; +} +void TableQueryExecutor::destroy_query_data_set(ResultSet* qds) { delete qds; } } // end namespace storage diff --git a/cpp/src/reader/table_query_executor.h b/cpp/src/reader/table_query_executor.h index 83a82fe5..974e6b45 100644 --- a/cpp/src/reader/table_query_executor.h +++ b/cpp/src/reader/table_query_executor.h @@ -37,15 +37,15 @@ class TableQueryExecutor { public: enum class TableQueryOrdering { TIME, DEVICE }; - TableQueryExecutor(IMetadataQuerier *meta_data_querier, - TsFileIOReader *tsfile_io_reader, + TableQueryExecutor(IMetadataQuerier* meta_data_querier, + TsFileIOReader* tsfile_io_reader, TableQueryOrdering table_query_ordering, int block_size = 1024) : meta_data_querier_(meta_data_querier), tsfile_io_reader_(tsfile_io_reader), table_query_ordering_(table_query_ordering), block_size_(block_size) {} - TableQueryExecutor(ReadFile *read_file) { + TableQueryExecutor(ReadFile* read_file) { tsfile_io_reader_ = new TsFileIOReader(); tsfile_io_reader_->init(read_file); meta_data_querier_ = new MetadataQuerier(tsfile_io_reader_); @@ -62,14 +62,18 @@ class TableQueryExecutor { tsfile_io_reader_ = nullptr; } } - int query(const std::string &table_name, - const std::vector<std::string> &columns, Filter *time_filter, - Filter *id_filter, Filter *field_filter, ResultSet *&ret_qds); - void destroy_query_data_set(ResultSet *qds); + int query(const std::string& table_name, + const std::vector<std::string>& columns, Filter* time_filter, + Filter* id_filter, Filter* field_filter, ResultSet*& ret_qds); + int query_on_tree(const std::vector<std::shared_ptr<IDeviceID>>& devices, + const std::vector<std::string>& tag_columns, + const std::vector<std::string>& field_columns, + Filter* time_filter, ResultSet*& ret_qds); + void destroy_query_data_set(ResultSet* qds); private: - IMetadataQuerier *meta_data_querier_; - TsFileIOReader *tsfile_io_reader_; + IMetadataQuerier* meta_data_querier_; + TsFileIOReader* tsfile_io_reader_; TableQueryOrdering table_query_ordering_; int32_t block_size_; }; diff --git a/cpp/src/reader/task/device_task_iterator.h b/cpp/src/reader/task/device_task_iterator.h index a5079877..ec30a472 100644 --- a/cpp/src/reader/task/device_task_iterator.h +++ b/cpp/src/reader/task/device_task_iterator.h @@ -43,6 +43,21 @@ class DeviceTaskIterator { table_schema_(table_schema) { pa_.init(512, common::MOD_DEVICE_TASK_ITER); } + + DeviceTaskIterator(std::vector<std::string> column_names, + std::vector<MetaIndexNode *> index_roots, + std::shared_ptr<ColumnMapping> column_mapping, + IMetadataQuerier *metadata_querier, + const Filter *id_filter, + std::shared_ptr<TableSchema> table_schema) + : column_names_(column_names), + column_mapping_(column_mapping), + device_meta_iterator_( + metadata_querier->device_iterator(index_roots, id_filter)), + table_schema_(table_schema) { + pa_.init(512, common::MOD_DEVICE_TASK_ITER); + } + ~DeviceTaskIterator() { pa_.destroy(); } bool has_next() const; diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 6da09430..d4646856 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -99,8 +99,6 @@ int TsFileReader::query(const std::string& table_name, return E_TABLE_NOT_EXIST; } - std::vector<TSDataType> data_types = table_schema->get_data_types(); - Filter* time_filter = new TimeBetween(start_time, end_time, false); ret = table_query_executor_->query(to_lower(table_name), columns_names, @@ -108,6 +106,42 @@ int TsFileReader::query(const std::string& table_name, return ret; } +int TsFileReader::query_table_on_tree( + const std::vector<std::string>& measurement_names, int64_t star_time, + int64_t end_time, ResultSet*& result_set) { + int ret = E_OK; + TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta(); + if (tsfile_meta == nullptr) { + return E_TSFILE_WRITER_META_ERR; + } + auto device_names = this->get_all_device_ids(); + std::vector<std::shared_ptr<IDeviceID>> device_ids; + size_t max_len = 0; + for (auto& device_name : device_names) { + std::vector<MeasurementSchema> schemas; + this->get_timeseries_schema(device_name, schemas); + for (auto schema : schemas) { + if (std::find(measurement_names.begin(), measurement_names.end(), + schema.measurement_name_) != + measurement_names.end()) { + device_ids.push_back(device_name); + if (device_name->get_segments().size() > max_len) { + max_len = device_name->get_segments().size(); + } + break; + } + } + } + std::vector<std::string> columns_names(max_len); + for (int i = 0; i < max_len; i++) { + columns_names[i] = "l_" + std::to_string(i); + } + Filter* time_filter = new TimeBetween(star_time, end_time, false); + ret = table_query_executor_->query_on_tree( + device_ids, columns_names, measurement_names, time_filter, result_set); + return ret; +} + void TsFileReader::destroy_query_data_set(storage::ResultSet* qds) { tsfile_executor_->destroy_query_data_set(qds); } diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index eb6a7b70..d316ce3a 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -54,7 +54,7 @@ class TsFileReader { * @param file_path the path of the tsfile which will be opened * @return Returns 0 on success, or a non-zero error code on failure. */ - int open(const std::string &file_path); + int open(const std::string& file_path); /** * @brief close the tsfile, this method should be called after the * query is finished @@ -70,7 +70,7 @@ class TsFileReader { * @param [out] ret_qds the result set * @return Returns 0 on success, or a non-zero error code on failure. */ - int query(storage::QueryExpression *qe, ResultSet *&ret_qds); + int query(storage::QueryExpression* qe, ResultSet*& ret_qds); /** * @brief query the tsfile by the path list, start time and end time * this method is used to query the tsfile by the tree model. @@ -80,8 +80,8 @@ class TsFileReader { * @param [in] end_time the end time * @param [out] result_set the result set */ - int query(std::vector<std::string> &path_list, int64_t start_time, - int64_t end_time, ResultSet *&result_set); + int query(std::vector<std::string>& path_list, int64_t start_time, + int64_t end_time, ResultSet*& result_set); /** * @brief query the tsfile by the table name, columns names, start time * and end time. this method is used to query the tsfile by the table @@ -93,19 +93,23 @@ class TsFileReader { * @param [in] end_time the end time * @param [out] result_set the result set */ - int query(const std::string &table_name, - const std::vector<std::string> &columns_names, int64_t start_time, - int64_t end_time, ResultSet *&result_set); + int query(const std::string& table_name, + const std::vector<std::string>& columns_names, int64_t start_time, + int64_t end_time, ResultSet*& result_set); + + int query_table_on_tree(const std::vector<std::string>& measurement_names, + int64_t star_time, int64_t end_time, + ResultSet*& result_set); /** * @brief destroy the result set, this method should be called after the * query is finished and result_set * * @param qds the result set */ - void destroy_query_data_set(ResultSet *qds); - ResultSet *read_timeseries( - const std::shared_ptr<IDeviceID> &device_id, - const std::vector<std::string> &measurement_name); + void destroy_query_data_set(ResultSet* qds); + ResultSet* read_timeseries( + const std::shared_ptr<IDeviceID>& device_id, + const std::vector<std::string>& measurement_name); /** * @brief get all devices in the tsfile * @@ -131,7 +135,7 @@ class TsFileReader { * @return Returns 0 on success, or a non-zero error code on failure. */ int get_timeseries_schema(std::shared_ptr<IDeviceID> device_id, - std::vector<MeasurementSchema> &result); + std::vector<MeasurementSchema>& result); /** * @brief get the table schema by the table name * @@ -139,7 +143,7 @@ class TsFileReader { * @return std::shared_ptr<TableSchema> the table schema */ std::shared_ptr<TableSchema> get_table_schema( - const std::string &table_name); + const std::string& table_name); /** * @brief get all table schemas in the tsfile * @@ -148,12 +152,12 @@ class TsFileReader { std::vector<std::shared_ptr<TableSchema>> get_all_table_schemas(); private: - int get_all_devices(std::vector<std::shared_ptr<IDeviceID>> &device_ids, + int get_all_devices(std::vector<std::shared_ptr<IDeviceID>>& device_ids, std::shared_ptr<MetaIndexNode> index_node, - common::PageArena &pa); - storage::ReadFile *read_file_; - storage::TsFileExecutor *tsfile_executor_; - storage::TableQueryExecutor *table_query_executor_; + common::PageArena& pa); + storage::ReadFile* read_file_; + storage::TsFileExecutor* tsfile_executor_; + storage::TableQueryExecutor* table_query_executor_; }; } // namespace storage diff --git a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc index 477ab24b..e534a131 100644 --- a/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc +++ b/cpp/test/reader/tree_view/tsfile_reader_tree_test.cc @@ -32,6 +32,7 @@ namespace storage { class QDSWithoutTimeGenerator; } + using namespace storage; using namespace common; @@ -49,7 +50,8 @@ class TsFileTreeReaderTest : public ::testing::Test { mode_t mode = 0666; write_file_.create(file_name_, flags, mode); } - void TearDown() override { remove(file_name_.c_str()); } + + void TearDown() override {} std::string file_name_; WriteFile write_file_; @@ -108,6 +110,90 @@ TEST_F(TsFileTreeReaderTest, BasicTest) { reader.close(); } +TEST_F(TsFileTreeReaderTest, ReadTreeByTable) { + TsFileTreeWriter writer(&write_file_); + std::vector<std::string> device_ids = {"root.db1.t1", "root.db2.t1", + "root.db3.t2.t3", "root.db3.t3", + "device"}; + std::vector<std::string> measurement_ids = {"temperature", "hudi", "level"}; + for (auto& device_id : device_ids) { + TsRecord record(device_id, 0); + TsRecord record1(device_id, 1); + for (auto const& measurement : measurement_ids) { + auto schema = + new storage::MeasurementSchema(measurement, TSDataType::INT32); + ASSERT_EQ(E_OK, writer.register_timeseries(device_id, schema)); + delete schema; + record.add_point(measurement, static_cast<int64_t>(1)); + record1.add_point(measurement, static_cast<int64_t>(2)); + } + ASSERT_EQ(E_OK, writer.write(record)); + ASSERT_EQ(E_OK, writer.write(record1)); + } + writer.flush(); + writer.close(); + + TsFileReader reader; + reader.open(file_name_); + ResultSet* result; + int ret = reader.query_table_on_tree({"temperature", "hudi"}, INT64_MIN, + INT64_MAX, result); + ASSERT_EQ(ret, E_OK); + + auto* table_result_set = (storage::TableResultSet*)result; + bool has_next = false; + int num = table_result_set->get_metadata()->get_column_count(); + std::unordered_map<std::string, std::string> res; + res["root.db1"] = "t1"; + res["root.db2"] = "t1"; + res["root.db3.t2"] = "t3"; + res["root.db3"] = "t3"; + res["device"] = "null"; + int cnt = 0; + while (IS_SUCC(table_result_set->next(has_next)) && has_next) { + auto t = table_result_set->get_value<int64_t>(1); + ASSERT_TRUE(t == 0 || t == 1); + std::string key = ""; + std::string value = ""; + for (int i = 1; i < num + 1; ++i) { + switch (table_result_set->get_metadata()->get_column_type(i)) { + case INT64: + ASSERT_TRUE(table_result_set->get_value<int64_t>(i) == 1 || + table_result_set->get_value<int64_t>(i) == 0); + break; + case INT32: + ASSERT_TRUE(table_result_set->get_value<int32_t>(i) == 1 || + table_result_set->get_value<int32_t>(i) == 2); + break; + case STRING: { + common::String* str = + table_result_set->get_value<common::String*>(i); + if (i == 2) { + key = std::string(str->buf_, str->len_); + ASSERT_TRUE(res.find(key) != res.end()); + } + if (i == 3) { + if (str == nullptr) { + value = "null"; + } else { + value = std::string(str->buf_, str->len_); + } + ASSERT_TRUE(res.find(key) != res.end()); + ASSERT_TRUE(res[key] == value); + } + } break; + default: + break; + } + } + std::cout << std::endl; + cnt++; + } + ASSERT_EQ(cnt, 10); + reader.destroy_query_data_set(result); + reader.close(); +} + TEST_F(TsFileTreeReaderTest, ExtendedRowsAndColumnsTest) { TsFileTreeWriter writer(&write_file_); std::vector<std::string> device_ids = {"device_1", "device_2", "device_3"};
