This is an automated email from the ASF dual-hosted git repository. morningman pushed a commit to branch branch-1.2-lts in repository https://gitbox.apache.org/repos/asf/doris.git
commit 296cd73c8c429af26a187f38a0fc9f0f70be58c9 Author: yixiutt <[email protected]> AuthorDate: Mon Jan 16 21:53:18 2023 +0800 [improvement](reader) use union merge when rowset are noneoverlapping (#15749) --- be/src/vec/olap/block_reader.cpp | 36 ++++++++++++++++++--- be/src/vec/olap/block_reader.h | 4 +++ be/src/vec/olap/vcollect_iterator.cpp | 61 ++++++++++++++++++++++++++++++++--- be/src/vec/olap/vcollect_iterator.h | 6 +++- 4 files changed, 97 insertions(+), 10 deletions(-) diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp index aa68e382b6..fa79e1bb9c 100644 --- a/be/src/vec/olap/block_reader.cpp +++ b/be/src/vec/olap/block_reader.cpp @@ -33,9 +33,33 @@ BlockReader::~BlockReader() { } } +bool BlockReader::_rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers) { + std::string cur_max_key; + for (const auto& rs_reader : rs_readers) { + // version 0-1 of every tablet is empty, just skip this rowset + if (rs_reader->rowset()->version().second == 1) { + continue; + } + if (rs_reader->rowset()->num_rows() == 0) { + continue; + } + if (rs_reader->rowset()->is_segments_overlapping()) { + return true; + } + std::string min_key; + bool has_min_key = rs_reader->rowset()->min_key(&min_key); + if (!has_min_key) { + return true; + } + if (min_key <= cur_max_key) { + return true; + } + CHECK(rs_reader->rowset()->max_key(&cur_max_key)); + } + return false; +} Status BlockReader::_init_collect_iter(const ReaderParams& read_params, std::vector<RowsetReaderSharedPtr>* valid_rs_readers) { - _vcollect_iter.init(this, read_params.read_orderby_key, read_params.read_orderby_key_reverse); std::vector<RowsetReaderSharedPtr> rs_readers; auto res = _capture_rs_readers(read_params, &rs_readers); if (!res.ok()) { @@ -46,6 +70,10 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params, << ", version:" << read_params.version; return res; } + // check if rowsets are noneoverlapping + _is_rowsets_overlapping = _rowsets_overlapping(rs_readers); + _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, + read_params.read_orderby_key_reverse); _reader_context.batch_size = _batch_size; _reader_context.is_vec = true; @@ -63,10 +91,8 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params, } RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers)); - if (_vcollect_iter.is_merge()) { - auto status = _vcollect_iter.current_row(&_next_row); - _eof = status.precise_code() == OLAP_ERR_DATA_EOF; - } + auto status = _vcollect_iter.current_row(&_next_row); + _eof = status.is_end_of_file(); return Status::OK(); } diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h index 0213783da0..83786a2e69 100644 --- a/be/src/vec/olap/block_reader.h +++ b/be/src/vec/olap/block_reader.h @@ -86,6 +86,8 @@ private: bool _get_next_row_same(); + bool _rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers); + VCollectIterator _vcollect_iter; IteratorRowRef _next_row {{}, -1, false}; @@ -115,6 +117,8 @@ private: std::vector<RowLocation> _block_row_locations; ColumnPtr _delete_filter_column; + + bool _is_rowsets_overlapping = true; }; } // namespace vectorized diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp index b9160514ad..cc07e9be30 100644 --- a/be/src/vec/olap/vcollect_iterator.cpp +++ b/be/src/vec/olap/vcollect_iterator.cpp @@ -37,7 +37,8 @@ VCollectIterator::~VCollectIterator() { } } -void VCollectIterator::init(TabletReader* reader, bool force_merge, bool is_reverse) { +void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, + bool is_reverse) { _reader = reader; // when aggregate is enabled or key_type is DUP_KEYS, we don't merge // multiple data to aggregate for better performance @@ -47,8 +48,10 @@ void VCollectIterator::init(TabletReader* reader, bool force_merge, bool is_reve _reader->_tablet->enable_unique_key_merge_on_write()))) { _merge = false; } - - if (force_merge) { + // When data is none overlapping, no need to build heap to traverse data + if (!ori_data_overlapping) { + _merge = false; + } else if (force_merge) { _merge = true; } _is_reverse = is_reverse; @@ -127,6 +130,22 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same)); } } else { + bool have_multiple_child = false; + bool is_first_child = true; + for (auto iter = _children.begin(); iter != _children.end();) { + auto s = (*iter)->init_for_union(is_first_child, have_multiple_child); + if (!s.ok()) { + delete (*iter); + iter = _children.erase(iter); + if (!s.is<END_OF_FILE>()) { + return s; + } + } else { + have_multiple_child = true; + is_first_child = false; + ++iter; + } + } _inner_iter.reset(new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same)); } RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init()); @@ -212,6 +231,36 @@ Status VCollectIterator::Level0Iterator::init(bool get_data_by_ref) { return st; } +// if is_first_child = true, return first row in block。Unique keys and agg keys will +// read a line first and then start loop : +// while (!eof) { +// collect_iter->next(&_next_row); +// } +// so first child load first row and other child row_pos = -1 +Status VCollectIterator::Level0Iterator::init_for_union(bool is_first_child, bool get_data_by_ref) { + _get_data_by_ref = get_data_by_ref && _rs_reader->support_return_data_by_ref() && + config::enable_storage_vectorization; + if (!_get_data_by_ref) { + _block = std::make_shared<Block>(_schema.create_block( + _reader->_return_columns, _reader->_tablet_columns_convert_to_null_set)); + } + auto st = _refresh_current_row(); + if (_get_data_by_ref && _block_view.size()) { + if (is_first_child) { + _ref = _block_view[0]; + } else { + _ref = _block_view[-1]; + } + } else { + if (is_first_child) { + _ref = {_block, 0, false}; + } else { + _ref = {_block, -1, false}; + } + } + return st; +} + int64_t VCollectIterator::Level0Iterator::version() const { return _rs_reader->version().second; } @@ -259,7 +308,7 @@ Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) { Status VCollectIterator::Level0Iterator::next(Block* block) { CHECK(!_get_data_by_ref); - if (_ref.row_pos == 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) { + if (_ref.row_pos <= 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) { block->swap(*_ref.block); _ref.reset(); return Status::OK(); @@ -306,6 +355,10 @@ VCollectIterator::Level1Iterator::Level1Iterator( _skip_same(skip_same) { _ref.reset(); _batch_size = reader->_batch_size; + // !_merge means that data are in order, so we just reverse children to return data in reverse + if (!_merge && _is_reverse) { + _children.reverse(); + } } VCollectIterator::Level1Iterator::~Level1Iterator() { diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h index b16f752bc0..d64e45cf91 100644 --- a/be/src/vec/olap/vcollect_iterator.h +++ b/be/src/vec/olap/vcollect_iterator.h @@ -39,7 +39,7 @@ public: // Hold reader point to get reader params ~VCollectIterator(); - void init(TabletReader* reader, bool force_merge, bool is_reverse); + void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse); Status add_child(RowsetReaderSharedPtr rs_reader); @@ -85,6 +85,9 @@ private: _compare_columns(reader->_reader_context.read_orderby_key_columns) {}; virtual Status init(bool get_data_by_ref = false) = 0; + virtual Status init_for_union(bool is_first_child, bool get_data_by_ref = false) { + return Status::OK(); + }; virtual int64_t version() const = 0; @@ -146,6 +149,7 @@ private: ~Level0Iterator() override = default; Status init(bool get_data_by_ref = false) override; + Status init_for_union(bool is_first_child, bool get_data_by_ref = false) override; int64_t version() const override; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
