This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d0d9a6669c5 [Enchancement](sort) optimize for heap sort (#51368)
d0d9a6669c5 is described below
commit d0d9a6669c54133ff2b7d8c423f986a74a6bb635
Author: Pxl <[email protected]>
AuthorDate: Mon Jun 16 20:13:59 2025 +0800
[Enchancement](sort) optimize for heap sort (#51368)
use MergeSorterQueue to replace priority_queue.
```sql
SELECT COUNT(UserAgentMajor), COUNT(ResolutionHeight),
COUNT(URLCategoryID), COUNT(OS), COUNT(EventTime), COUNT(CounterID),
COUNT(EventDate), COUNT(UserID), COUNT(UTMSource), COUNT(UTMMedium),
COUNT(UTMCampaign), COUNT(UTMContent) FROM ( SELECT UserAgentMajor,
ResolutionHeight, URLCategoryID, OS, EventTime, CounterID, EventDate, UserID,
UTMSource, UTMMedium, UTMCampaign, UTMContent FROM hits_100m ORDER BY
UserAgentMajor, ResolutionHeight, URLCategoryID, OS, EventTime L [...]
before:
- AppendBlockTime: 10sec77ms
- ExecTime: 13sec646ms
after:
- AppendBlockTime: 1sec811ms
- ExecTime: 2sec31ms
```
---
be/src/vec/common/sort/heap_sorter.cpp | 190 ++++-----------------
be/src/vec/common/sort/heap_sorter.h | 78 +--------
be/src/vec/common/sort/partition_sorter.cpp | 4 +-
be/src/vec/common/sort/sorter.cpp | 16 +-
be/src/vec/common/sort/sorter.h | 5 +-
be/src/vec/common/sort/topn_sorter.cpp | 2 +-
be/src/vec/core/sort_cursor.h | 120 ++++---------
be/src/vec/runtime/vsorted_run_merger.cpp | 2 +-
be/test/pipeline/operator/sort_operator_test.cpp | 33 +++-
be/test/vec/exec/sort/heap_sorter_test.cpp | 36 ++--
be/test/vec/exec/sort/merge_sorter_state.cpp | 2 +-
.../data/variant_p0/test_sub_path_pruning.out | Bin 5855 -> 5855 bytes
.../data/variant_p0/topn_opt_read_by_rowids.out | Bin 9886 -> 9886 bytes
.../suites/variant_p0/test_sub_path_pruning.groovy | 32 ++--
.../variant_p0/topn_opt_read_by_rowids.groovy | 8 +-
15 files changed, 152 insertions(+), 376 deletions(-)
diff --git a/be/src/vec/common/sort/heap_sorter.cpp
b/be/src/vec/common/sort/heap_sorter.cpp
index dfabd9e436a..aed775cb6d8 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -17,199 +17,67 @@
#include "vec/common/sort/heap_sorter.h"
-#include <glog/logging.h>
-
-#include <algorithm>
-
-#include "runtime/primitive_type.h"
-#include "runtime/thread_context.h"
-#include "util/defer_op.h"
-#include "vec/columns/column.h"
-#include "vec/columns/column_nullable.h"
-#include "vec/common/sort/vsort_exec_exprs.h"
-#include "vec/core/column_with_type_and_name.h"
-#include "vec/core/sort_description.h"
-#include "vec/data_types/data_type_nullable.h"
-#include "vec/exprs/vexpr_context.h"
-
-namespace doris {
+namespace doris::vectorized {
#include "common/compile_check_begin.h"
-class ObjectPool;
-class RowDescriptor;
-class RuntimeState;
-} // namespace doris
-namespace doris::vectorized {
HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int64_t limit,
int64_t offset,
ObjectPool* pool, std::vector<bool>& is_asc_order,
std::vector<bool>& nulls_first, const RowDescriptor&
row_desc)
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
- _data_size(0),
_heap_size(limit + offset),
- _heap(SortingHeap::create_unique()),
- _topn_filter_rows(0),
- _init_sort_descs(false) {}
+ _state(MergeSorterState::create_unique(row_desc, offset)) {}
Status HeapSorter::append_block(Block* block) {
- DCHECK(block->rows() > 0);
- {
- SCOPED_TIMER(_materialize_timer);
- if (_vsort_exec_exprs.need_materialize_tuple()) {
- auto output_tuple_expr_ctxs =
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
- std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
- for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
- RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block,
&valid_column_ids[i]));
- }
-
- Block new_block;
- int i = 0;
- const auto& convert_nullable_flags =
_vsort_exec_exprs.get_convert_nullable_flags();
- for (auto column_id : valid_column_ids) {
- if (column_id < 0) {
- continue;
- }
- if (i < convert_nullable_flags.size() &&
convert_nullable_flags[i]) {
- auto column_ptr =
make_nullable(block->get_by_position(column_id).column);
- new_block.insert({column_ptr,
-
make_nullable(block->get_by_position(column_id).type), ""});
- } else {
- new_block.insert(block->get_by_position(column_id));
- }
- i++;
- }
- block->swap(new_block);
+ auto tmp_block = std::make_shared<Block>(block->clone_empty());
+ RETURN_IF_ERROR(partial_sort(*block, *tmp_block, true));
+ _queue.push(
+ MergeSortCursor(std::make_shared<MergeSortCursorImpl>(tmp_block,
_sort_description)));
+ _queue_row_num += tmp_block->rows();
+ _data_size += tmp_block->allocated_bytes();
+
+ while (_queue.is_valid() && _queue_row_num > _heap_size) {
+ auto [current, current_rows] = _queue.current();
+ current_rows = std::min(current_rows, _queue_row_num - _heap_size);
+
+ if (!current->impl->is_last(current_rows)) {
+ _queue.next(current_rows);
+ } else {
+ _queue.remove_top();
+ _data_size -= current->impl->block->allocated_bytes();
}
+ _queue_row_num -= current_rows;
}
- if (!_init_sort_descs) {
- RETURN_IF_ERROR(_prepare_sort_descs(block));
- }
- Block tmp_block = block->clone_empty();
- tmp_block.swap(*block);
- size_t num_rows = tmp_block.rows();
- auto block_view =
- std::make_shared<HeapSortCursorBlockView>(std::move(tmp_block),
_sort_description);
- bool filtered = false;
- if (_heap_size == _heap->size()) {
- {
- SCOPED_TIMER(_topn_filter_timer);
- _do_filter(*block_view, num_rows);
- }
- size_t remain_rows = block_view->block.rows();
- _topn_filter_rows += (num_rows - remain_rows);
- COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows);
- filtered = remain_rows == 0;
- for (size_t i = 0; i < remain_rows; ++i) {
- HeapSortCursorImpl cursor(i, block_view);
- _heap->replace_top_if_less(std::move(cursor));
- }
- } else {
- size_t free_slots = std::min<size_t>(_heap_size - _heap->size(),
num_rows);
-
- size_t i = 0;
- for (; i < free_slots; ++i) {
- HeapSortCursorImpl cursor(i, block_view);
- _heap->push(std::move(cursor));
- }
- for (; i < num_rows; ++i) {
- HeapSortCursorImpl cursor(i, block_view);
- _heap->replace_top_if_less(std::move(cursor));
- }
- }
- if (!filtered) {
- _data_size += block_view->block.allocated_bytes();
- }
return Status::OK();
}
Status HeapSorter::prepare_for_read() {
- if (!_heap->empty() && _heap->size() > _offset) {
- const auto& top = _heap->top();
- size_t num_columns = top.block()->columns();
- MutableColumns result_columns = top.block()->clone_empty_columns();
-
- size_t init_size = std::min((size_t)_limit, _heap->size());
- result_columns.reserve(init_size);
-
- DCHECK(_heap->size() <= _heap_size);
- // Use a vector to reverse elements in heap
- std::vector<HeapSortCursorImpl> vector_to_reverse;
- vector_to_reverse.reserve(init_size);
- size_t capacity = 0;
- while (!_heap->empty()) {
- auto current = _heap->top();
- _heap->pop();
- vector_to_reverse.emplace_back(std::move(current));
- capacity++;
- if (_offset != 0 && _heap->size() == _offset) {
- break;
- }
- }
- for (int64_t i = capacity - 1; i >= 0; i--) {
- auto rid = vector_to_reverse[i].row_id();
- const auto cur_block = vector_to_reverse[i].block();
- Columns columns = cur_block->get_columns();
- for (size_t j = 0; j < num_columns; ++j) {
- result_columns[j]->insert_from(*(columns[j]), rid);
- }
+ while (_queue.is_valid()) {
+ auto [current, current_rows] = _queue.current();
+ if (current_rows) {
+ current->impl->reverse();
+ _state->get_queue().push(MergeSortCursor(current->impl));
}
- _return_block =
vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns));
+ _queue.remove_top();
}
return Status::OK();
}
Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
- _return_block.swap(*block);
- *eos = true;
- return Status::OK();
+ return _state->merge_sort_read(block, state->batch_size(), eos);
}
Field HeapSorter::get_top_value() {
Field field {PrimitiveType::TYPE_NULL};
// get field from first sort column of top row
- if (_heap->size() >= _heap_size) {
- auto& top = _heap->top();
- top.sort_columns()[0]->get(top.row_id(), field);
+ if (_queue_row_num >= _heap_size) {
+ auto [current, current_rows] = _queue.current();
+ field = current->get_top_value();
}
return field;
}
-// need exception safety
-void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t
num_rows) {
- const auto& top_cursor = _heap->top();
- const auto cursor_rid = top_cursor.row_id();
-
- IColumn::Filter filter(num_rows);
- for (size_t i = 0; i < num_rows; ++i) {
- filter[i] = 0;
- }
-
- std::vector<uint8_t> cmp_res(num_rows, 0);
-
- for (size_t col_id = 0; col_id < _sort_description.size(); ++col_id) {
- block_view.sort_columns[col_id]->compare_internal(
- cursor_rid, *top_cursor.sort_columns()[col_id],
- _sort_description[col_id].nulls_direction,
_sort_description[col_id].direction,
- cmp_res, filter.data());
- }
- block_view.filter_block(filter);
-}
-
-Status HeapSorter::_prepare_sort_descs(Block* block) {
- _sort_description.resize(_vsort_exec_exprs.ordering_expr_ctxs().size());
- for (int i = 0; i < _sort_description.size(); i++) {
- const auto& ordering_expr = _vsort_exec_exprs.ordering_expr_ctxs()[i];
- RETURN_IF_ERROR(ordering_expr->execute(block,
&_sort_description[i].column_number));
-
- _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
- _sort_description[i].nulls_direction =
- _nulls_first[i] ? -_sort_description[i].direction :
_sort_description[i].direction;
- }
- _init_sort_descs = true;
- return Status::OK();
-}
-
size_t HeapSorter::data_size() const {
return _data_size;
}
diff --git a/be/src/vec/common/sort/heap_sorter.h
b/be/src/vec/common/sort/heap_sorter.h
index b36ef28af70..16a1e7d5d08 100644
--- a/be/src/vec/common/sort/heap_sorter.h
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -16,62 +16,11 @@
// under the License.
#pragma once
-#include <gen_cpp/Metrics_types.h>
-#include <stddef.h>
-#include <stdint.h>
-#include <memory>
-#include <queue>
-#include <utility>
-#include <vector>
-
-#include "common/status.h"
-#include "util/runtime_profile.h"
#include "vec/common/sort/sorter.h"
-#include "vec/core/block.h"
-#include "vec/core/field.h"
-#include "vec/core/sort_cursor.h"
-
-namespace doris {
-#include "common/compile_check_begin.h"
-class ObjectPool;
-class RowDescriptor;
-class RuntimeState;
-namespace vectorized {
-class VSortExecExprs;
-} // namespace vectorized
-} // namespace doris
namespace doris::vectorized {
-
-class SortingHeap {
- ENABLE_FACTORY_CREATOR(SortingHeap);
-
-public:
- const HeapSortCursorImpl& top() { return _queue.top(); }
-
- size_t size() { return _queue.size(); }
-
- bool empty() { return _queue.empty(); }
-
- void pop() { _queue.pop(); }
-
- void replace_top(HeapSortCursorImpl&& top) {
- _queue.pop();
- _queue.push(std::move(top));
- }
-
- void push(HeapSortCursorImpl&& cursor) { _queue.push(std::move(cursor)); }
-
- void replace_top_if_less(HeapSortCursorImpl&& val) {
- if (val < top()) {
- replace_top(std::move(val));
- }
- }
-
-private:
- std::priority_queue<HeapSortCursorImpl> _queue;
-};
+#include "common/compile_check_begin.h"
class HeapSorter final : public Sorter {
ENABLE_FACTORY_CREATOR(HeapSorter);
@@ -83,12 +32,6 @@ public:
~HeapSorter() override = default;
- void init_profile(RuntimeProfile* runtime_profile) override {
- _topn_filter_timer = ADD_TIMER(runtime_profile, "TopNFilterTime");
- _topn_filter_rows_counter = ADD_COUNTER(runtime_profile,
"TopNFilterRows", TUnit::UNIT);
- _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime");
- }
-
Status append_block(Block* block) override;
Status prepare_for_read() override;
@@ -99,23 +42,14 @@ public:
Field get_top_value() override;
- static constexpr size_t HEAP_SORT_THRESHOLD = 1024;
-
private:
- void _do_filter(HeapSortCursorBlockView& block_view, size_t num_rows);
-
Status _prepare_sort_descs(Block* block);
- size_t _data_size;
- size_t _heap_size;
- std::unique_ptr<SortingHeap> _heap;
- Block _return_block;
- int64_t _topn_filter_rows;
- bool _init_sort_descs;
-
- RuntimeProfile::Counter* _topn_filter_timer = nullptr;
- RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr;
- RuntimeProfile::Counter* _materialize_timer = nullptr;
+ size_t _data_size = 0;
+ size_t _heap_size = 0;
+ size_t _queue_row_num = 0;
+ MergeSorterQueue _queue;
+ std::unique_ptr<MergeSorterState> _state;
};
#include "common/compile_check_end.h"
diff --git a/be/src/vec/common/sort/partition_sorter.cpp
b/be/src/vec/common/sort/partition_sorter.cpp
index 09c45e83c6a..e1b27e790a4 100644
--- a/be/src/vec/common/sort/partition_sorter.cpp
+++ b/be/src/vec/common/sort/partition_sorter.cpp
@@ -47,7 +47,7 @@ PartitionSorter::PartitionSorter(VSortExecExprs&
vsort_exec_exprs, int64_t limit
bool has_global_limit, int64_t
partition_inner_limit,
TopNAlgorithm::type top_n_algorithm,
SortCursorCmp* previous_row)
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
- _state(MergeSorterState::create_unique(row_desc, offset, limit,
state, profile)),
+ _state(MergeSorterState::create_unique(row_desc, offset)),
_row_desc(row_desc),
_partition_inner_limit(partition_inner_limit),
_top_n_algorithm(
@@ -81,7 +81,7 @@ Status PartitionSorter::prepare_for_read() {
void PartitionSorter::reset_sorter_state(RuntimeState* runtime_state) {
std::priority_queue<MergeSortBlockCursor> empty_queue;
std::swap(_block_priority_queue, empty_queue);
- _state = MergeSorterState::create_unique(_row_desc, _offset, _limit,
runtime_state, nullptr);
+ _state = MergeSorterState::create_unique(_row_desc, _offset);
// _previous_row->impl inited at partition_sort_read function,
// but maybe call get_next after do_partition_topn_sort() function, and
running into else if branch at line 92L
// so _previous_row->impl == nullptr and no need reset.
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 1df2d2bdc62..a2a69552192 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -152,7 +152,7 @@ Status Sorter::merge_sort_read_for_spill(RuntimeState*
state, doris::vectorized:
return get_next(state, block, eos);
}
-Status Sorter::partial_sort(Block& src_block, Block& dest_block) {
+Status Sorter::partial_sort(Block& src_block, Block& dest_block, bool
reversed) {
size_t num_cols = src_block.columns();
if (_materialize_sort_exprs) {
auto output_tuple_expr_ctxs =
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
@@ -189,18 +189,18 @@ Status Sorter::partial_sort(Block& src_block, Block&
dest_block) {
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
_sort_description[i].nulls_direction =
_nulls_first[i] ? -_sort_description[i].direction :
_sort_description[i].direction;
+ if (reversed) {
+ _sort_description[i].direction *= -1;
+ }
}
{
SCOPED_TIMER(_partial_sort_timer);
- if (_materialize_sort_exprs) {
- sort_block(dest_block, dest_block, _sort_description, _offset +
_limit);
- } else {
- sort_block(src_block, dest_block, _sort_description, _offset +
_limit);
- }
- src_block.clear_column_data(num_cols);
+ uint64_t limit = reversed ? 0 : (_offset + _limit);
+ sort_block(*result_block, dest_block, _sort_description, limit);
}
+ src_block.clear_column_data(num_cols);
return Status::OK();
}
@@ -209,7 +209,7 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs,
int64_t limit, int64_t
std::vector<bool>& nulls_first, const RowDescriptor&
row_desc,
RuntimeState* state, RuntimeProfile* profile)
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
- _state(MergeSorterState::create_unique(row_desc, offset, limit,
state, profile)) {}
+ _state(MergeSorterState::create_unique(row_desc, offset)) {}
// check whether the unsorted block can hold more data from input block and no
need to alloc new memory
bool FullSorter::has_enough_capacity(Block* input_block, Block*
unsorted_block) const {
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 37e74947ccc..cd56af60775 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -52,8 +52,7 @@ class MergeSorterState {
ENABLE_FACTORY_CREATOR(MergeSorterState);
public:
- MergeSorterState(const RowDescriptor& row_desc, int64_t offset, int64_t
limit,
- RuntimeState* state, RuntimeProfile* profile)
+ MergeSorterState(const RowDescriptor& row_desc, int64_t offset)
// create_empty_block should ignore invalid slots, unsorted_block
// should be same structure with arrival block from child node
// since block from child node may ignored these slots
@@ -152,7 +151,7 @@ public:
void set_enable_spill() { _enable_spill = true; }
protected:
- Status partial_sort(Block& src_block, Block& dest_block);
+ Status partial_sort(Block& src_block, Block& dest_block, bool reversed =
false);
bool _enable_spill = false;
SortDescription _sort_description;
diff --git a/be/src/vec/common/sort/topn_sorter.cpp
b/be/src/vec/common/sort/topn_sorter.cpp
index 3e0bc5ee688..fe3cecca5cd 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -44,7 +44,7 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs,
int64_t limit, int64_t
std::vector<bool>& nulls_first, const RowDescriptor&
row_desc,
RuntimeState* state, RuntimeProfile* profile)
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
- _state(MergeSorterState::create_unique(row_desc, offset, limit,
state, profile)),
+ _state(MergeSorterState::create_unique(row_desc, offset)),
_row_desc(row_desc) {}
Status TopNSorter::append_block(Block* block) {
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index 9331508c376..3e3baf6a9e0 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -24,98 +24,12 @@
#include "vec/columns/column.h"
#include "vec/core/block.h"
+#include "vec/core/field.h"
#include "vec/core/sort_description.h"
#include "vec/exprs/vexpr_context.h"
namespace doris::vectorized {
-struct HeapSortCursorBlockView {
-public:
- Block block;
- ColumnRawPtrs sort_columns;
- SortDescription& desc;
-
- HeapSortCursorBlockView(Block&& cur_block, SortDescription& sort_desc)
- : block(cur_block), desc(sort_desc) {
- _reset();
- }
-
- // need exception safety
- void filter_block(IColumn::Filter& filter) {
- Block::filter_block_internal(&block, filter, block.columns());
- _reset();
- }
-
-private:
- void _reset() {
- sort_columns.clear();
- auto columns = block.get_columns_and_convert();
- for (auto& column_desc : desc) {
- size_t column_number = !column_desc.column_name.empty()
- ?
block.get_position_by_name(column_desc.column_name)
- : column_desc.column_number;
- sort_columns.push_back(columns[column_number].get());
- }
- }
-};
-
-using HeapSortCursorBlockSPtr = std::shared_ptr<HeapSortCursorBlockView>;
-
-struct HeapSortCursorImpl {
-public:
- HeapSortCursorImpl(size_t row_id, HeapSortCursorBlockSPtr block_view)
- : _row_id(row_id), _block_view(std::move(block_view)) {}
-
- HeapSortCursorImpl(const HeapSortCursorImpl& other) {
- _row_id = other._row_id;
- _block_view = other._block_view;
- }
-
- HeapSortCursorImpl(HeapSortCursorImpl&& other) {
- _row_id = other._row_id;
- _block_view = other._block_view;
- other._block_view = nullptr;
- }
-
- HeapSortCursorImpl& operator=(HeapSortCursorImpl&& other) {
- std::swap(_row_id, other._row_id);
- std::swap(_block_view, other._block_view);
- return *this;
- }
-
- ~HeapSortCursorImpl() = default;
-
- size_t row_id() const { return _row_id; }
-
- const ColumnRawPtrs& sort_columns() const { return
_block_view->sort_columns; }
-
- const Block* block() const { return &_block_view->block; }
-
- const SortDescription& sort_desc() const { return _block_view->desc; }
-
- bool operator<(const HeapSortCursorImpl& rhs) const {
- for (size_t i = 0; i < sort_desc().size(); ++i) {
- int direction = sort_desc()[i].direction;
- int nulls_direction = sort_desc()[i].nulls_direction;
- int res = direction * sort_columns()[i]->compare_at(row_id(),
rhs.row_id(),
-
*(rhs.sort_columns()[i]),
-
nulls_direction);
- // ASC: direction == 1. If bigger, res > 0. So we return true.
- if (res < 0) {
- return true;
- }
- if (res > 0) {
- return false;
- }
- }
- return false;
- }
-
-private:
- size_t _row_id;
- HeapSortCursorBlockSPtr _block_view;
-};
-
/** Cursor allows to compare rows in different blocks (and parts).
* Cursor moves inside single block.
* It is used in priority queue.
@@ -127,8 +41,8 @@ struct MergeSortCursorImpl {
ColumnRawPtrs columns;
SortDescription desc;
size_t sort_columns_size = 0;
- size_t pos = 0;
- size_t rows = 0;
+ int pos = 0;
+ int rows = 0;
MergeSortCursorImpl() = default;
virtual ~MergeSortCursorImpl() = default;
@@ -145,6 +59,22 @@ struct MergeSortCursorImpl {
bool empty() const { return rows == 0; }
+ void reverse() {
+ MutableColumns columns_reversed;
+ for (auto& column : columns) {
+ auto col_reversed = column->clone_empty();
+ for (int j = rows - 1; j >= pos; j--) {
+ col_reversed->insert_from(*column, j);
+ }
+ columns_reversed.push_back(std::move(col_reversed));
+ }
+ block->set_columns(std::move(columns_reversed));
+ for (auto& column_desc : desc) {
+ column_desc.direction *= -1;
+ }
+ reset();
+ }
+
/// Set the cursor to the beginning of the new block.
void reset() {
sort_columns.clear();
@@ -174,6 +104,12 @@ struct MergeSortCursorImpl {
virtual void process_next() {}
virtual Block* block_ptr() { return nullptr; }
virtual bool eof() const { return false; }
+
+ Field get_top_value() const {
+ Field field {PrimitiveType::TYPE_NULL};
+ sort_columns[0]->get(pos, field);
+ return field;
+ }
};
using BlockSupplier = std::function<Status(Block*, bool* eos)>;
@@ -287,6 +223,8 @@ struct MergeSortCursor {
/// Inverted so that the priority queue elements are removed in ascending
order.
bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); }
+
+ Field get_top_value() const { return impl->get_top_value(); }
};
/// For easy copying.
@@ -423,8 +361,8 @@ public:
}
}
- void push(MergeSortCursorImpl& cursor) {
- _queue.emplace_back(&cursor);
+ void push(MergeSortCursor cursor) {
+ _queue.emplace_back(std::move(cursor));
std::push_heap(_queue.begin(), _queue.end());
next_child_idx = 0;
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp
b/be/src/vec/runtime/vsorted_run_merger.cpp
index 25376f9216b..8483e78f25b 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -119,7 +119,7 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
DCHECK(!current->eof());
DCHECK(current->block_ptr() != nullptr);
while (_offset != 0) {
- auto process_rows = std::min(current->rows - current->pos,
_offset);
+ auto process_rows = std::min(current->rows - current->pos,
(int)_offset);
current->next(process_rows);
_offset -= process_rows;
if (current->is_last(0)) {
diff --git a/be/test/pipeline/operator/sort_operator_test.cpp
b/be/test/pipeline/operator/sort_operator_test.cpp
index cd1a4c35d85..0cab5a95bd9 100644
--- a/be/test/pipeline/operator/sort_operator_test.cpp
+++ b/be/test/pipeline/operator/sort_operator_test.cpp
@@ -110,9 +110,9 @@ struct SortOperatorTest : public ::testing::Test {
state->emplace_local_state(source->operator_id(),
std::move(source_local_state_uptr));
}
- { EXPECT_TRUE(sink_local_state->open(state.get()).ok()); }
+ EXPECT_TRUE(sink_local_state->open(state.get()).ok());
- { EXPECT_TRUE(source_local_state->open(state.get()).ok()); }
+ EXPECT_TRUE(source_local_state->open(state.get()).ok());
}
bool is_block(std::vector<Dependency*> deps) {
@@ -167,11 +167,17 @@ TEST_F(SortOperatorTest, test) {
bool eos = false;
auto st = source->get_block(state.get(), &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
- EXPECT_TRUE(eos);
+ EXPECT_FALSE(eos);
EXPECT_EQ(block.rows(), 3);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+
+ block.clear();
+ st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(block.rows(), 0);
}
}
@@ -200,11 +206,26 @@ TEST_F(SortOperatorTest, test_dep) {
bool eos = false;
auto st = source->get_block(state.get(), &block, &eos);
EXPECT_TRUE(st.ok()) << st.msg();
- EXPECT_TRUE(eos);
- EXPECT_EQ(block.rows(), 6);
+ EXPECT_FALSE(eos);
+ EXPECT_EQ(block.rows(), 3);
std::cout << block.dump_data() << std::endl;
EXPECT_TRUE(ColumnHelper::block_equal(
- block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3, 4,
5, 6})));
+ block, ColumnHelper::create_block<DataTypeInt64>({1, 2, 3})));
+
+ block.clear();
+ st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_FALSE(eos);
+ EXPECT_EQ(block.rows(), 3);
+ std::cout << block.dump_data() << std::endl;
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block, ColumnHelper::create_block<DataTypeInt64>({4, 5, 6})));
+
+ block.clear();
+ st = source->get_block(state.get(), &block, &eos);
+ EXPECT_TRUE(st.ok()) << st.msg();
+ EXPECT_TRUE(eos);
+ EXPECT_EQ(block.rows(), 0);
}
}
diff --git a/be/test/vec/exec/sort/heap_sorter_test.cpp
b/be/test/vec/exec/sort/heap_sorter_test.cpp
index b1e13b4e150..24735190fc8 100644
--- a/be/test/vec/exec/sort/heap_sorter_test.cpp
+++ b/be/test/vec/exec/sort/heap_sorter_test.cpp
@@ -98,32 +98,48 @@ TEST_F(HeapSorterTest, test_topn_sorter1) {
EXPECT_TRUE(st.ok());
}
- EXPECT_EQ(sorter->_heap->size(), 6);
+ EXPECT_EQ(sorter->_queue_row_num, 6);
{
Block block = ColumnHelper::create_block<DataTypeInt64>({6}, {6});
auto st = sorter->append_block(&block);
EXPECT_TRUE(st.ok());
- }
- EXPECT_EQ(sorter->_heap->size(), 6);
+ EXPECT_EQ(sorter->_queue_row_num, 6);
- static_cast<void>(sorter->get_top_value());
+ auto value = sorter->get_top_value();
+ Field real;
+ block.get_by_position(0).column->get(0, real);
+ EXPECT_EQ(value, real);
+ }
EXPECT_TRUE(sorter->prepare_for_read());
{
Block block;
- bool eos;
+ bool eos = false;
EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
- std::cout << block.dump_data() << std::endl;
- EXPECT_EQ(block.rows(), 6);
-
+ EXPECT_EQ(block.rows(), 5);
+ EXPECT_EQ(eos, false);
EXPECT_TRUE(ColumnHelper::block_equal(
block,
Block
{ColumnHelper::create_nullable_column_with_name<DataTypeInt64>(
- {1, 2, 3, 4, 5, 6}, {false, false, false,
false, false, false}),
-
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5, 6})}));
+ {1, 2, 3, 4, 5}, {false, false, false, false,
false}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({1, 2, 3, 4, 5})}));
+
+ block.clear();
+ EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+ EXPECT_EQ(block.rows(), 1);
+ EXPECT_EQ(eos, false);
+ EXPECT_TRUE(ColumnHelper::block_equal(
+ block,
+ Block
{ColumnHelper::create_nullable_column_with_name<DataTypeInt64>({6}, {false}),
+
ColumnHelper::create_column_with_name<DataTypeInt64>({6})}));
+
+ block.clear();
+ EXPECT_TRUE(sorter->get_next(&_state, &block, &eos));
+ EXPECT_EQ(block.rows(), 0);
+ EXPECT_EQ(eos, true);
}
}
diff --git a/be/test/vec/exec/sort/merge_sorter_state.cpp
b/be/test/vec/exec/sort/merge_sorter_state.cpp
index 34223fd1f3a..0bc4da12d9c 100644
--- a/be/test/vec/exec/sort/merge_sorter_state.cpp
+++ b/be/test/vec/exec/sort/merge_sorter_state.cpp
@@ -62,7 +62,7 @@ std::shared_ptr<Block> create_block(std::vector<int64_t>
data) {
}
TEST_F(MergeSorterStateTest, test1) {
- state.reset(new MergeSorterState(*row_desc, 0, -1, &_state, &_profile));
+ state.reset(new MergeSorterState(*row_desc, 0));
state->add_sorted_block(create_block({1, 2, 3}));
state->add_sorted_block(create_block({4, 5, 6}));
state->add_sorted_block(create_block({}));
diff --git a/regression-test/data/variant_p0/test_sub_path_pruning.out
b/regression-test/data/variant_p0/test_sub_path_pruning.out
index 16328739167..f8f9e3f3894 100644
Binary files a/regression-test/data/variant_p0/test_sub_path_pruning.out and
b/regression-test/data/variant_p0/test_sub_path_pruning.out differ
diff --git a/regression-test/data/variant_p0/topn_opt_read_by_rowids.out
b/regression-test/data/variant_p0/topn_opt_read_by_rowids.out
index 6ee3844f7ef..93ae6f7c5a3 100644
Binary files a/regression-test/data/variant_p0/topn_opt_read_by_rowids.out and
b/regression-test/data/variant_p0/topn_opt_read_by_rowids.out differ
diff --git a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
index 2ca4ab06683..1de622047a4 100644
--- a/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
+++ b/regression-test/suites/variant_p0/test_sub_path_pruning.groovy
@@ -161,20 +161,20 @@ suite("variant_sub_path_pruning", "variant_type"){
order_qt_sql """select c1['c']['d'] from (select dt['a']['b'] as c1 from
pruning_test union all select dt['a'] as c1 from pruning_test union all select
dt as c1 from pruning_test) v1;"""
// one table + one const list
- order_qt_sql """select id, cast(c1['a'] as text) from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100;"""
- order_qt_sql """select c1['a'] from (select id, c1 from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp;"""
- order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all
select dt as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
- // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from
(select cast('1' as variant) as c1, 1 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp;"""
- order_qt_sql """select id, cast(c1['c'] as text) from (select
cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1,
id from pruning_test) tmp order by 1, 2 limit 100;"""
- order_qt_sql """select c1['c'] from (select id, c1 from (select
cast('{"c":1}' as variant) as c1, 1 as id union all select dt['a']['b'] as c1,
id from pruning_test) tmp order by id limit 100) tmp;"""
- order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as
c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all
select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100)
tmp;"""
- // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from
(select cast('{"c":{"d":1}}' as variant) as c1, 1 as id union all select
dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
+ order_qt_sql """select id, cast(c1['a'] as text) from (select
cast('{"a":1}' as variant) as c1, 0 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100;"""
+ order_qt_sql """select c1['a'] from (select id, c1 from (select
cast('{"a":1}' as variant) as c1, 0 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp order by id;"""
+ order_qt_sql """select c2['b'] from (select id, cast(c1['a'] as text) as
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all
select dt as c1, id from pruning_test) tmp order by id limit 100) tmp order by
id;"""
+ // order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from
(select cast('1' as variant) as c1, 0 as id union all select dt as c1, id from
pruning_test) tmp order by id limit 100) tmp;"""
+ order_qt_sql """select id, cast(c1['c'] as text) from (select
cast('{"c":1}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1,
id from pruning_test) tmp order by 1, 2 limit 100;"""
+ order_qt_sql """select c1['c'] from (select id, c1 from (select
cast('{"c":1}' as variant) as c1, 0 as id union all select dt['a']['b'] as c1,
id from pruning_test) tmp order by id limit 100) tmp order by id;"""
+ order_qt_sql """select cast(c2['d'] as text) from (select id, c1['a'] as
c2 from (select cast('{"c":{"d":1}}' as variant) as c1, 0 as id union all
select dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp
order by id;"""
+ // order_qt_sql """select c2['c']['d'] from (select id, c1 as c2 from
(select cast('{"c":{"d":1}}' as variant) as c1, 0 as id union all select
dt['a']['b'] as c1, id from pruning_test) tmp order by id limit 100) tmp;"""
// two const list
- order_qt_sql """select id, cast(c1['a'] as text) from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as
variant) as c1, 2 as id) tmp order by id limit 100;"""
- order_qt_sql """select c1['a'] from (select id, c1 from (select
cast('{"a":1}' as variant) as c1, 1 as id union all select cast('{"a":1}' as
variant) as c1, 2 as id) tmp order by id limit 100) tmp;"""
- order_qt_sql """select cast(c2['b'] as text) from (select id, c1['a'] as
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all
select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit
100) tmp;"""
- order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select
cast('{"a":{"b":1}}' as variant) as c1, 1 as id union all select
cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100)
tmp;"""
+ order_qt_sql """select id, cast(c1['a'] as text) from (select
cast('{"a":1}' as variant) as c1, 0 as id union all select cast('{"a":1}' as
variant) as c1, 2 as id) tmp order by id limit 100;"""
+ order_qt_sql """select c1['a'] from (select id, c1 from (select
cast('{"a":1}' as variant) as c1, 0 as id union all select cast('{"a":1}' as
variant) as c1, 2 as id) tmp order by id limit 100) tmp order by id;"""
+ order_qt_sql """select cast(c2['b'] as text) from (select id, c1['a'] as
c2 from (select cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all
select cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit
100) tmp order by id;"""
+ order_qt_sql """select c2['a']['b'] from (select id, c1 as c2 from (select
cast('{"a":{"b":1}}' as variant) as c1, 0 as id union all select
cast('{"a":{"b":1}}' as variant) as c1, 2 as id) tmp order by id limit 100) tmp
order by id;"""
// join
@@ -212,8 +212,8 @@ suite("variant_sub_path_pruning", "variant_type"){
order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select id,
cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 from pruning_test order by
id limit 100) tmp;"""
// varaint in one row relation
- order_qt_sql """select c1['a'] from (select 1 as id, cast('{"a":1}' as
variant) as c1 order by id limit 100) tmp;"""
- order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 1 as id,
cast('{"a":1, "b":2}' as variant) as c1 order by id limit 100) tmp;"""
- order_qt_sql """select c1['a'] from (select 1 as id, cast('{"b":{"a":1}}'
as variant)["b"] as c1 order by id limit 100) tmp;"""
- order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 1 as id,
cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 order by id limit 100)
tmp;"""
+ order_qt_sql """select c1['a'] from (select 0 as id, cast('{"a":1}' as
variant) as c1 order by id limit 100) tmp;"""
+ order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 0 as id,
cast('{"a":1, "b":2}' as variant) as c1 order by id limit 100) tmp;"""
+ order_qt_sql """select c1['a'] from (select 0 as id, cast('{"b":{"a":1}}'
as variant)["b"] as c1 order by id limit 100) tmp;"""
+ order_qt_sql """select c1['a'] as c2, c1['b'] as c3 from (select 0 as id,
cast('{"b":{"a":1, "b":2}}' as variant)["b"] as c1 order by id limit 100)
tmp;"""
}
\ No newline at end of file
diff --git a/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
b/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
index dd597755a6d..15594775c40 100644
--- a/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
+++ b/regression-test/suites/variant_p0/topn_opt_read_by_rowids.groovy
@@ -77,14 +77,14 @@ PROPERTIES (
INSERT INTO `test_web_log` VALUES ('2024-04-09 09:01:39', '', '', '',
'', '1712624474952', '0004.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '',
NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '',
'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826',
'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIm
[...]
"""
sql "set topn_opt_limit_threshold = 1024"
- qt_sql """SELECT
+ order_qt_sql """SELECT
* FROM
test_web_log
WHERE
ts >= '1712480940849'
AND ts <= '1712805483291'
ORDER BY
- ts DESC
+ ts DESC
LIMIT 10"""
sql """
INSERT INTO `test_web_log` VALUES ('2024-04-09 09:02:31', '', '', '',
'', '1712624495211', '004.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '',
NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '',
'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826',
'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIml
[...]
@@ -95,10 +95,10 @@ PROPERTIES (
sql """
INSERT INTO `test_web_log` VALUES ('2024-04-09 09:04:33', '', '', '',
'', '1712624474959', '0024.00', '740lp', 'sit-iniwork-lcd-designer.qm.cn', '',
NULL, NULL, '', '630beed604d0513c', '0000.0', NULL, '', '', '', '',
'https://sit-iniwork-lcd-designer.qm.cn/designer', '1544512389907021826',
'https://sit-iniwork-lcd-designer.qm.cn/designer?caseName=44&caseCode=d4&appId=3fec598d32b10d26958d1d9119519c64&token=eyJhbGciOiJIUzUxMiJ9.eyJzdWIiOiI3NDBscCIsImNyZWF0ZWQiOjE3MTI1NDEzMTU0MzUsIm
[...]
"""
- qt_sql """SELECT
+ order_qt_sql """SELECT
* FROM
test_web_log
ORDER BY
- ts DESC
+ ts DESC
LIMIT 10"""
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]