This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 3cfaae0031 [Improvement](sort) Use heap sort to optimize sort node
(#12700)
3cfaae0031 is described below
commit 3cfaae0031f2a4260528cd4d1b1855af82bf0c3b
Author: Gabriel <[email protected]>
AuthorDate: Wed Sep 21 10:01:52 2022 +0800
[Improvement](sort) Use heap sort to optimize sort node (#12700)
---
be/src/util/simd/bits.h | 2 +-
be/src/vec/CMakeLists.txt | 2 +
be/src/vec/columns/column.cpp | 21 ++++
be/src/vec/columns/column.h | 10 ++
be/src/vec/columns/column_decimal.cpp | 26 +++++
be/src/vec/columns/column_decimal.h | 4 +
be/src/vec/columns/column_string.cpp | 24 +++++
be/src/vec/columns/column_string.h | 4 +
be/src/vec/columns/column_vector.cpp | 25 +++++
be/src/vec/columns/column_vector.h | 4 +
be/src/vec/common/sort/heap_sorter.cpp | 165 ++++++++++++++++++++++++++++
be/src/vec/common/sort/heap_sorter.h | 90 ++++++++++++++++
be/src/vec/common/sort/sorter.cpp | 85 ++-------------
be/src/vec/common/sort/sorter.h | 47 +++-----
be/src/vec/common/sort/topn_sorter.cpp | 92 ++++++++++++++++
be/src/vec/common/sort/topn_sorter.h | 47 ++++++++
be/src/vec/core/sort_block.h | 2 +-
be/src/vec/core/sort_cursor.h | 172 +++++++++++++++++++++++++-----
be/src/vec/exec/vsort_node.cpp | 36 ++++---
be/src/vec/exec/vsort_node.h | 2 -
be/src/vec/runtime/vsorted_run_merger.cpp | 6 +-
be/src/vec/runtime/vsorted_run_merger.h | 6 +-
22 files changed, 704 insertions(+), 168 deletions(-)
diff --git a/be/src/util/simd/bits.h b/be/src/util/simd/bits.h
index 8edbe72f4a..df91e63c61 100644
--- a/be/src/util/simd/bits.h
+++ b/be/src/util/simd/bits.h
@@ -112,7 +112,7 @@ inline static size_t find_byte(const std::vector<T>& vec,
size_t start, T byte)
return (T*)p - vec.data();
}
-inline size_t find_nonzero(const std::vector<uint8_t>& vec, size_t start) {
+inline size_t find_one(const std::vector<uint8_t>& vec, size_t start) {
return find_byte<uint8_t>(vec, start, 1);
}
diff --git a/be/src/vec/CMakeLists.txt b/be/src/vec/CMakeLists.txt
index 01f20742e6..0d0497add1 100644
--- a/be/src/vec/CMakeLists.txt
+++ b/be/src/vec/CMakeLists.txt
@@ -54,7 +54,9 @@ set(VEC_FILES
common/exception.cpp
common/mremap.cpp
common/pod_array.cpp
+ common/sort/heap_sorter.cpp
common/sort/sorter.cpp
+ common/sort/topn_sorter.cpp
common/sort/vsort_exec_exprs.cpp
common/string_utils/string_utils.cpp
core/block.cpp
diff --git a/be/src/vec/columns/column.cpp b/be/src/vec/columns/column.cpp
index 845600bc1b..44521cc98c 100644
--- a/be/src/vec/columns/column.cpp
+++ b/be/src/vec/columns/column.cpp
@@ -52,6 +52,27 @@ void IColumn::sort_column(const ColumnSorter* sorter,
EqualFlags& flags,
sorter->sort_column(static_cast<const IColumn&>(*this), flags, perms,
range, last_column);
}
+void IColumn::compare_internal(size_t rhs_row_id, const IColumn& rhs, int
nan_direction_hint,
+ int direction, std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const {
+ auto sz = this->size();
+ DCHECK(cmp_res.size() == sz);
+ size_t begin = simd::find_zero(cmp_res, 0);
+ while (begin < sz) {
+ size_t end = simd::find_one(cmp_res, begin + 1);
+ for (size_t row_id = begin; row_id < end; row_id++) {
+ int res = this->compare_at(row_id, rhs_row_id, rhs,
nan_direction_hint);
+ if (res * direction < 0) {
+ filter[row_id] = 1;
+ cmp_res[row_id] = 1;
+ } else if (res * direction > 0) {
+ cmp_res[row_id] = 1;
+ }
+ }
+ begin = simd::find_zero(cmp_res, end + 1);
+ }
+}
+
bool is_column_nullable(const IColumn& column) {
return check_column<ColumnNullable>(column);
}
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index f58cfa8c0e..94b39ba1e0 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -402,6 +402,16 @@ public:
virtual int compare_at(size_t n, size_t m, const IColumn& rhs,
int nan_direction_hint) const = 0;
+ /**
+ * To compare all rows in this column with another row (with row_id =
rhs_row_id in column rhs)
+ * @param nan_direction_hint and direction indicates the ordering.
+ * @param cmp_res if we already has a comparison result for row i, e.g.
cmp_res[i] = 1, we can skip row i
+ * @param filter this stores comparison results for all rows. filter[i] =
1 means row i is less than row rhs_row_id in rhs
+ */
+ virtual void compare_internal(size_t rhs_row_id, const IColumn& rhs, int
nan_direction_hint,
+ int direction, std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const;
+
/** Returns a permutation that sorts elements of this column,
* i.e. perm[i]-th element of source column should be i-th element of
sorted column.
* reverse - reverse ordering (acsending).
diff --git a/be/src/vec/columns/column_decimal.cpp
b/be/src/vec/columns/column_decimal.cpp
index 690714b2ba..ca7260b16c 100644
--- a/be/src/vec/columns/column_decimal.cpp
+++ b/be/src/vec/columns/column_decimal.cpp
@@ -381,6 +381,32 @@ void ColumnDecimal<T>::sort_column(const ColumnSorter*
sorter, EqualFlags& flags
sorter->template sort_column(static_cast<const Self&>(*this), flags,
perms, range, last_column);
}
+template <typename T>
+void ColumnDecimal<T>::compare_internal(size_t rhs_row_id, const IColumn& rhs,
+ int nan_direction_hint, int direction,
+ std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const {
+ auto sz = this->size();
+ DCHECK(cmp_res.size() == sz);
+ const auto& cmp_base = assert_cast<const
ColumnDecimal<T>&>(rhs).get_data()[rhs_row_id];
+
+ size_t begin = simd::find_zero(cmp_res, 0);
+ while (begin < sz) {
+ size_t end = simd::find_one(cmp_res, begin + 1);
+ for (size_t row_id = begin; row_id < end; row_id++) {
+ auto value_a = get_data()[row_id];
+ int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0);
+ if (res * direction < 0) {
+ filter[row_id] = 1;
+ cmp_res[row_id] = 1;
+ } else if (res * direction > 0) {
+ cmp_res[row_id] = 1;
+ }
+ }
+ begin = simd::find_zero(cmp_res, end + 1);
+ }
+}
+
template <>
Decimal32 ColumnDecimal<Decimal32>::get_scale_multiplier() const {
return common::exp10_i32(scale);
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index 9957742c0d..bd1d90e63c 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -233,6 +233,10 @@ public:
void sort_column(const ColumnSorter* sorter, EqualFlags& flags,
IColumn::Permutation& perms,
EqualRange& range, bool last_column) const override;
+ void compare_internal(size_t rhs_row_id, const IColumn& rhs, int
nan_direction_hint,
+ int direction, std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const override;
+
UInt32 get_scale() const { return scale; }
T get_scale_multiplier() const;
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index 87fa8da649..d988791ca0 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -487,4 +487,28 @@ void ColumnString::protect() {
get_offsets().protect();
}
+void ColumnString::compare_internal(size_t rhs_row_id, const IColumn& rhs, int
nan_direction_hint,
+ int direction, std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const {
+ auto sz = this->size();
+ DCHECK(cmp_res.size() == sz);
+ const auto& cmp_base = assert_cast<const
ColumnString&>(rhs).get_data_at(rhs_row_id);
+ size_t begin = simd::find_zero(cmp_res, 0);
+ while (begin < sz) {
+ size_t end = simd::find_one(cmp_res, begin + 1);
+ for (size_t row_id = begin; row_id < end; row_id++) {
+ auto value_a = get_data_at(row_id);
+ int res = memcmp_small_allow_overflow15(value_a.data,
value_a.size, cmp_base.data,
+ cmp_base.size);
+ if (res * direction < 0) {
+ filter[row_id] = 1;
+ cmp_res[row_id] = 1;
+ } else if (res * direction > 0) {
+ cmp_res[row_id] = 1;
+ }
+ }
+ begin = simd::find_zero(cmp_res, end + 1);
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index 56344a85e9..783d79cf07 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -371,6 +371,10 @@ public:
offsets[self_row] = offsets[self_row - 1];
}
}
+
+ void compare_internal(size_t rhs_row_id, const IColumn& rhs, int
nan_direction_hint,
+ int direction, std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const override;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index e80b4009c6..2ca35f6948 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -128,6 +128,31 @@ void ColumnVector<T>::sort_column(const ColumnSorter*
sorter, EqualFlags& flags,
sorter->template sort_column(static_cast<const Self&>(*this), flags,
perms, range, last_column);
}
+template <typename T>
+void ColumnVector<T>::compare_internal(size_t rhs_row_id, const IColumn& rhs,
+ int nan_direction_hint, int direction,
+ std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const {
+ auto sz = this->size();
+ DCHECK(cmp_res.size() == sz);
+ const auto& cmp_base = assert_cast<const
ColumnVector<T>&>(rhs).get_data()[rhs_row_id];
+ size_t begin = simd::find_zero(cmp_res, 0);
+ while (begin < sz) {
+ size_t end = simd::find_one(cmp_res, begin + 1);
+ for (size_t row_id = begin; row_id < end; row_id++) {
+ auto value_a = get_data()[row_id];
+ int res = value_a > cmp_base ? 1 : (value_a < cmp_base ? -1 : 0);
+ if (res * direction < 0) {
+ filter[row_id] = 1;
+ cmp_res[row_id] = 1;
+ } else if (res * direction > 0) {
+ cmp_res[row_id] = 1;
+ }
+ }
+ begin = simd::find_zero(cmp_res, end + 1);
+ }
+}
+
template <typename T>
void ColumnVector<T>::update_crcs_with_value(std::vector<uint64_t>& hashes,
PrimitiveType type,
const uint8_t* __restrict
null_data) const {
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 2867d64c65..d22e8f4b69 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -393,6 +393,10 @@ public:
void sort_column(const ColumnSorter* sorter, EqualFlags& flags,
IColumn::Permutation& perms,
EqualRange& range, bool last_column) const override;
+ void compare_internal(size_t rhs_row_id, const IColumn& rhs, int
nan_direction_hint,
+ int direction, std::vector<uint8>& cmp_res,
+ uint8* __restrict filter) const override;
+
protected:
Container data;
};
diff --git a/be/src/vec/common/sort/heap_sorter.cpp
b/be/src/vec/common/sort/heap_sorter.cpp
new file mode 100644
index 0000000000..795bd66941
--- /dev/null
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/sort/heap_sorter.h"
+
+#include "util/defer_op.h"
+
+namespace doris::vectorized {
+HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t
offset,
+ ObjectPool* pool, std::vector<bool>& is_asc_order,
+ std::vector<bool>& nulls_first, const RowDescriptor&
row_desc)
+ : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
+ _heap_size(limit + offset),
+ _heap(std::make_unique<SortingHeap>()),
+ _topn_filter_rows(0),
+ _init_sort_descs(false) {}
+
+Status HeapSorter::append_block(Block* block, bool* mem_reuse) {
+ DCHECK(block->rows() > 0);
+ {
+ SCOPED_TIMER(_materialize_timer);
+ if (_vsort_exec_exprs.need_materialize_tuple()) {
+ auto output_tuple_expr_ctxs =
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
+ std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
+ for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
+ RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(block,
&valid_column_ids[i]));
+ }
+
+ Block new_block;
+ for (auto column_id : valid_column_ids) {
+ new_block.insert(block->get_by_position(column_id));
+ }
+ block->swap(new_block);
+ }
+ }
+ if (!_init_sort_descs) {
+ RETURN_IF_ERROR(_prepare_sort_descs(block));
+ }
+ Block tmp_block = block->clone_empty();
+ tmp_block.swap(*block);
+ HeapSortCursorBlockView block_view_val(std::move(tmp_block),
_sort_description);
+ SharedHeapSortCursorBlockView* block_view =
+ new SharedHeapSortCursorBlockView(std::move(block_view_val));
+ block_view->ref();
+ Defer defer([&] { block_view->unref(); });
+ size_t num_rows = tmp_block.rows();
+ if (_heap_size == _heap->size()) {
+ {
+ SCOPED_TIMER(_topn_filter_timer);
+ _do_filter(block_view->value(), num_rows);
+ }
+ size_t remain_rows = block_view->value().block.rows();
+ _topn_filter_rows += (num_rows - remain_rows);
+ COUNTER_SET(_topn_filter_rows_counter, _topn_filter_rows);
+ for (size_t i = 0; i < remain_rows; ++i) {
+ HeapSortCursorImpl cursor(i, block_view);
+ _heap->replace_top_if_less(std::move(cursor));
+ }
+ } else {
+ size_t free_slots = std::min<size_t>(_heap_size - _heap->size(),
num_rows);
+
+ size_t i = 0;
+ for (; i < free_slots; ++i) {
+ HeapSortCursorImpl cursor(i, block_view);
+ _heap->push(std::move(cursor));
+ }
+
+ for (; i < num_rows; ++i) {
+ HeapSortCursorImpl cursor(i, block_view);
+ _heap->replace_top_if_less(std::move(cursor));
+ }
+ }
+ return Status::OK();
+}
+
+Status HeapSorter::prepare_for_read() {
+ if (!_heap->empty() && _heap->size() > _offset) {
+ const auto& top = _heap->top();
+ size_t num_columns = top.block()->columns();
+ MutableColumns result_columns = top.block()->clone_empty_columns();
+
+ size_t init_size = std::min((size_t)_limit, _heap->size());
+ result_columns.reserve(init_size);
+
+ DCHECK(_heap->size() <= _heap_size);
+ // Use a vector to reverse elements in heap
+ std::vector<HeapSortCursorImpl> vector_to_reverse;
+ vector_to_reverse.reserve(init_size);
+ size_t capacity = 0;
+ while (!_heap->empty()) {
+ auto current = _heap->top();
+ _heap->pop();
+ vector_to_reverse.emplace_back(std::move(current));
+ capacity++;
+ if (_offset != 0 && _heap->size() == _offset) {
+ break;
+ }
+ }
+ for (int i = capacity - 1; i >= 0; i--) {
+ auto rid = vector_to_reverse[i].row_id();
+ const auto cur_block = vector_to_reverse[i].block();
+ for (size_t j = 0; j < num_columns; ++j) {
+ result_columns[j]->insert_from(*(cur_block->get_columns()[j]),
rid);
+ }
+ }
+ _return_block =
vector_to_reverse[0].block()->clone_with_columns(std::move(result_columns));
+ }
+ return Status::OK();
+}
+
+Status HeapSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
+ _return_block.swap(*block);
+ *eos = true;
+ return Status::OK();
+}
+
+void HeapSorter::_do_filter(HeapSortCursorBlockView& block_view, size_t
num_rows) {
+ const auto& top_cursor = _heap->top();
+ const int cursor_rid = top_cursor.row_id();
+
+ IColumn::Filter filter(num_rows);
+ for (size_t i = 0; i < num_rows; ++i) {
+ filter[i] = 0;
+ }
+
+ std::vector<uint8_t> cmp_res(num_rows, 0);
+
+ for (size_t col_id = 0; col_id < _sort_description.size(); ++col_id) {
+ block_view.sort_columns[col_id]->compare_internal(
+ cursor_rid, *top_cursor.sort_columns()[col_id],
+ _sort_description[col_id].nulls_direction,
_sort_description[col_id].direction,
+ cmp_res, filter.data());
+ }
+ block_view.filter_block(filter);
+}
+
+Status HeapSorter::_prepare_sort_descs(Block* block) {
+
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
+ for (int i = 0; i < _sort_description.size(); i++) {
+ const auto& ordering_expr =
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
+ RETURN_IF_ERROR(ordering_expr->execute(block,
&_sort_description[i].column_number));
+
+ _sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
+ _sort_description[i].nulls_direction =
+ _nulls_first[i] ? -_sort_description[i].direction :
_sort_description[i].direction;
+ }
+ _init_sort_descs = true;
+ return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/heap_sorter.h
b/be/src/vec/common/sort/heap_sorter.h
new file mode 100644
index 0000000000..f725d585c2
--- /dev/null
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "vec/common/sort/sorter.h"
+
+namespace doris::vectorized {
+
+class SortingHeap {
+public:
+ const HeapSortCursorImpl& top() { return _queue.top(); }
+
+ size_t size() { return _queue.size(); }
+
+ bool empty() { return _queue.empty(); }
+
+ void pop() { _queue.pop(); }
+
+ void replace_top(HeapSortCursorImpl&& top) {
+ _queue.pop();
+ _queue.push(std::move(top));
+ }
+
+ void push(HeapSortCursorImpl&& cursor) { _queue.push(std::move(cursor)); }
+
+ void replace_top_if_less(HeapSortCursorImpl&& val) {
+ if (val < top()) {
+ replace_top(std::move(val));
+ }
+ }
+
+private:
+ std::priority_queue<HeapSortCursorImpl> _queue;
+};
+
+class HeapSorter final : public Sorter {
+public:
+ HeapSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset,
ObjectPool* pool,
+ std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
+ const RowDescriptor& row_desc);
+
+ ~HeapSorter() override = default;
+
+ void init_profile(RuntimeProfile* runtime_profile) override {
+ _topn_filter_timer = ADD_TIMER(runtime_profile, "TopNFilterTime");
+ _topn_filter_rows_counter = ADD_COUNTER(runtime_profile,
"TopNFilterRows", TUnit::UNIT);
+ _materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime");
+ }
+
+ Status append_block(Block* block, bool* mem_reuse) override;
+
+ Status prepare_for_read() override;
+
+ Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+
+ static constexpr size_t HEAP_SORT_THRESHOLD = 1024;
+
+private:
+ void _do_filter(HeapSortCursorBlockView& block_view, size_t num_rows);
+
+ Status _prepare_sort_descs(Block* block);
+
+ size_t _heap_size;
+ std::unique_ptr<SortingHeap> _heap;
+ Block _return_block;
+ int64_t _topn_filter_rows;
+ bool _init_sort_descs;
+
+ RuntimeProfile::Counter* _topn_filter_timer = nullptr;
+ RuntimeProfile::Counter* _topn_filter_rows_counter = nullptr;
+ RuntimeProfile::Counter* _materialize_timer = nullptr;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 86de482333..9b5641075d 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -25,7 +25,7 @@ void MergeSorterState::build_merge_tree(SortDescription&
sort_description) {
}
if (sorted_blocks.size() > 1) {
- for (auto& cursor : cursors) priority_queue.push(SortCursor(&cursor));
+ for (auto& cursor : cursors)
priority_queue.push(MergeSortCursor(&cursor));
}
}
@@ -105,11 +105,10 @@ Status Sorter::partial_sort(Block& block) {
return Status::OK();
}
-FullSorter::FullSorter(SortDescription& sort_description, VSortExecExprs&
vsort_exec_exprs,
- int limit, int64_t offset, ObjectPool* pool,
std::vector<bool>& is_asc_order,
+FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t
offset,
+ ObjectPool* pool, std::vector<bool>& is_asc_order,
std::vector<bool>& nulls_first, const RowDescriptor&
row_desc)
- : Sorter(sort_description, vsort_exec_exprs, limit, offset, pool,
is_asc_order,
- nulls_first),
+ : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
_state(std::unique_ptr<MergeSorterState>(new
MergeSorterState(row_desc, offset))) {}
Status FullSorter::append_block(Block* block, bool* mem_reuse) {
@@ -160,9 +159,10 @@ Status FullSorter::_do_sort() {
_state->sorted_blocks.emplace_back(std::move(block));
_state->num_rows += block.rows();
_block_priority_queue.emplace(_pool->add(
- new SortCursorImpl(_state->sorted_blocks.back(),
_sort_description)));
+ new MergeSortCursorImpl(_state->sorted_blocks.back(),
_sort_description)));
} else {
- SortBlockCursor block_cursor(_pool->add(new SortCursorImpl(block,
_sort_description)));
+ MergeSortBlockCursor block_cursor(
+ _pool->add(new MergeSortCursorImpl(block,
_sort_description)));
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->sorted_blocks.emplace_back(std::move(block));
_block_priority_queue.push(block_cursor);
@@ -176,75 +176,4 @@ Status FullSorter::_do_sort() {
return Status::OK();
}
-TopNSorter::TopNSorter(SortDescription& sort_description, VSortExecExprs&
vsort_exec_exprs,
- int limit, int64_t offset, ObjectPool* pool,
std::vector<bool>& is_asc_order,
- std::vector<bool>& nulls_first, const RowDescriptor&
row_desc)
- : Sorter(sort_description, vsort_exec_exprs, limit, offset, pool,
is_asc_order,
- nulls_first),
- _state(std::unique_ptr<MergeSorterState>(new
MergeSorterState(row_desc, offset))) {}
-
-Status TopNSorter::append_block(Block* block, bool* mem_reuse) {
- DCHECK(block->rows() > 0);
- RETURN_IF_ERROR(_do_sort(block, mem_reuse));
- return Status::OK();
-}
-
-Status TopNSorter::prepare_for_read() {
- _state->build_merge_tree(_sort_description);
- return Status::OK();
-}
-
-Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
- if (_state->sorted_blocks.empty()) {
- *eos = true;
- } else if (_state->sorted_blocks.size() == 1) {
- if (_offset != 0) {
- _state->sorted_blocks[0].skip_num_rows(_offset);
- }
- block->swap(_state->sorted_blocks[0]);
- *eos = true;
- } else {
- RETURN_IF_ERROR(_state->merge_sort_read(state, block, eos));
- }
- return Status::OK();
-}
-
-Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
- *mem_reuse = false;
- RETURN_IF_ERROR(partial_sort(*block));
- // dispose TOP-N logic
- if (_limit != -1) {
- // Here is a little opt to reduce the mem uasge, we build a max heap
- // to order the block in _block_priority_queue.
- // if one block totally greater the heap top of _block_priority_queue
- // we can throw the block data directly.
- if (_state->num_rows < _limit) {
- Block sorted_block;
- sorted_block.swap(*block);
- _state->sorted_blocks.emplace_back(std::move(sorted_block));
- _state->num_rows += sorted_block.rows();
- _block_priority_queue.emplace(_pool->add(
- new SortCursorImpl(_state->sorted_blocks.back(),
_sort_description)));
- } else {
- Block sorted_block;
- sorted_block.swap(*block);
- SortBlockCursor block_cursor(
- _pool->add(new SortCursorImpl(sorted_block,
_sort_description)));
- if (!block_cursor.totally_greater(_block_priority_queue.top())) {
- _state->sorted_blocks.emplace_back(std::move(sorted_block));
- _block_priority_queue.push(block_cursor);
- } else {
- *mem_reuse = true;
- block->clear_column_data();
- }
- }
- } else {
- Block sorted_block;
- sorted_block.swap(*block);
- // dispose normal sort logic
- _state->sorted_blocks.emplace_back(std::move(sorted_block));
- }
- return Status::OK();
-}
-
} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 9d881538a0..2e56f8012a 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -47,8 +47,8 @@ public:
Status merge_sort_read(doris::RuntimeState* state,
doris::vectorized::Block* block, bool* eos);
- std::priority_queue<SortCursor> priority_queue;
- std::vector<SortCursorImpl> cursors;
+ std::priority_queue<MergeSortCursor> priority_queue;
+ std::vector<MergeSortCursorImpl> cursors;
std::unique_ptr<MutableBlock> unsorted_block;
std::vector<Block> sorted_blocks;
uint64_t num_rows = 0;
@@ -60,11 +60,9 @@ private:
class Sorter {
public:
- Sorter(SortDescription& sort_description, VSortExecExprs&
vsort_exec_exprs, int limit,
- int64_t offset, ObjectPool* pool, std::vector<bool>& is_asc_order,
- std::vector<bool>& nulls_first)
- : _sort_description(sort_description),
- _vsort_exec_exprs(vsort_exec_exprs),
+ Sorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset,
ObjectPool* pool,
+ std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first)
+ : _vsort_exec_exprs(vsort_exec_exprs),
_limit(limit),
_offset(offset),
_pool(pool),
@@ -73,10 +71,10 @@ public:
virtual ~Sorter() = default;
- void init_profile(RuntimeProfile* runtime_profile) {
+ virtual void init_profile(RuntimeProfile* runtime_profile) {
_partial_sort_timer = ADD_TIMER(runtime_profile, "PartialSortTime");
_merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime");
- }
+ };
virtual Status append_block(Block* block, bool* mem_reuse) = 0;
@@ -87,7 +85,7 @@ public:
protected:
Status partial_sort(Block& block);
- SortDescription& _sort_description;
+ SortDescription _sort_description;
VSortExecExprs& _vsort_exec_exprs;
int _limit;
int64_t _offset;
@@ -95,16 +93,17 @@ protected:
std::vector<bool>& _is_asc_order;
std::vector<bool>& _nulls_first;
- std::priority_queue<SortBlockCursor> _block_priority_queue;
RuntimeProfile::Counter* _partial_sort_timer = nullptr;
RuntimeProfile::Counter* _merge_block_timer = nullptr;
+
+ std::priority_queue<MergeSortBlockCursor> _block_priority_queue;
};
class FullSorter final : public Sorter {
public:
- FullSorter(SortDescription& sort_description, VSortExecExprs&
vsort_exec_exprs, int limit,
- int64_t offset, ObjectPool* pool, std::vector<bool>&
is_asc_order,
- std::vector<bool>& nulls_first, const RowDescriptor& row_desc);
+ FullSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset,
ObjectPool* pool,
+ std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
+ const RowDescriptor& row_desc);
~FullSorter() override = default;
@@ -128,24 +127,4 @@ private:
static constexpr size_t BUFFERED_BLOCK_BYTES = 16 << 20;
};
-class TopNSorter final : public Sorter {
-public:
- TopNSorter(SortDescription& sort_description, VSortExecExprs&
vsort_exec_exprs, int limit,
- int64_t offset, ObjectPool* pool, std::vector<bool>&
is_asc_order,
- std::vector<bool>& nulls_first, const RowDescriptor& row_desc);
-
- ~TopNSorter() override = default;
-
- Status append_block(Block* block, bool* mem_reuse) override;
-
- Status prepare_for_read() override;
-
- Status get_next(RuntimeState* state, Block* block, bool* eos) override;
-
-private:
- Status _do_sort(Block* block, bool* mem_reuse);
-
- std::unique_ptr<MergeSorterState> _state;
-};
-
} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/topn_sorter.cpp
b/be/src/vec/common/sort/topn_sorter.cpp
new file mode 100644
index 0000000000..4ed7af6d04
--- /dev/null
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "vec/common/sort/topn_sorter.h"
+
+namespace doris::vectorized {
+
+TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t
offset,
+ ObjectPool* pool, std::vector<bool>& is_asc_order,
+ std::vector<bool>& nulls_first, const RowDescriptor&
row_desc)
+ : Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
+ _state(std::unique_ptr<MergeSorterState>(new
MergeSorterState(row_desc, offset))) {}
+
+Status TopNSorter::append_block(Block* block, bool* mem_reuse) {
+ DCHECK(block->rows() > 0);
+ RETURN_IF_ERROR(_do_sort(block, mem_reuse));
+ return Status::OK();
+}
+
+Status TopNSorter::prepare_for_read() {
+ _state->build_merge_tree(_sort_description);
+ return Status::OK();
+}
+
+Status TopNSorter::get_next(RuntimeState* state, Block* block, bool* eos) {
+ if (_state->sorted_blocks.empty()) {
+ *eos = true;
+ } else if (_state->sorted_blocks.size() == 1) {
+ if (_offset != 0) {
+ _state->sorted_blocks[0].skip_num_rows(_offset);
+ }
+ block->swap(_state->sorted_blocks[0]);
+ *eos = true;
+ } else {
+ RETURN_IF_ERROR(_state->merge_sort_read(state, block, eos));
+ }
+ return Status::OK();
+}
+
+Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
+ *mem_reuse = false;
+ RETURN_IF_ERROR(partial_sort(*block));
+ // dispose TOP-N logic
+ if (_limit != -1) {
+ // Here is a little opt to reduce the mem uasge, we build a max heap
+ // to order the block in _block_priority_queue.
+ // if one block totally greater the heap top of _block_priority_queue
+ // we can throw the block data directly.
+ if (_state->num_rows < _limit) {
+ Block sorted_block;
+ sorted_block.swap(*block);
+ _state->sorted_blocks.emplace_back(std::move(sorted_block));
+ _state->num_rows += sorted_block.rows();
+ _block_priority_queue.emplace(_pool->add(
+ new MergeSortCursorImpl(_state->sorted_blocks.back(),
_sort_description)));
+ } else {
+ Block sorted_block;
+ sorted_block.swap(*block);
+ MergeSortBlockCursor block_cursor(
+ _pool->add(new MergeSortCursorImpl(sorted_block,
_sort_description)));
+ if (!block_cursor.totally_greater(_block_priority_queue.top())) {
+ _state->sorted_blocks.emplace_back(std::move(sorted_block));
+ _block_priority_queue.push(block_cursor);
+ } else {
+ *mem_reuse = true;
+ block->clear_column_data();
+ }
+ }
+ } else {
+ Block sorted_block;
+ sorted_block.swap(*block);
+ // dispose normal sort logic
+ _state->sorted_blocks.emplace_back(std::move(sorted_block));
+ }
+ return Status::OK();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/common/sort/topn_sorter.h
b/be/src/vec/common/sort/topn_sorter.h
new file mode 100644
index 0000000000..675442f5a1
--- /dev/null
+++ b/be/src/vec/common/sort/topn_sorter.h
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+#include <queue>
+
+#include "vec/common/sort/sorter.h"
+
+namespace doris::vectorized {
+
+class TopNSorter final : public Sorter {
+public:
+ TopNSorter(VSortExecExprs& vsort_exec_exprs, int limit, int64_t offset,
ObjectPool* pool,
+ std::vector<bool>& is_asc_order, std::vector<bool>& nulls_first,
+ const RowDescriptor& row_desc);
+
+ ~TopNSorter() override = default;
+
+ Status append_block(Block* block, bool* mem_reuse) override;
+
+ Status prepare_for_read() override;
+
+ Status get_next(RuntimeState* state, Block* block, bool* eos) override;
+
+ static constexpr size_t TOPN_SORT_THRESHOLD = 256;
+
+private:
+ Status _do_sort(Block* block, bool* mem_reuse);
+
+ std::unique_ptr<MergeSorterState> _state;
+};
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/core/sort_block.h b/be/src/vec/core/sort_block.h
index 2a2babf46f..cc791881c1 100644
--- a/be/src/vec/core/sort_block.h
+++ b/be/src/vec/core/sort_block.h
@@ -77,7 +77,7 @@ struct EqualRangeIterator {
// should continue to sort this row according to current column. Using
the first non-zero
// value and first zero value after first non-zero value as two
bounds, we can get an equal range here
if (!(_cur_range_begin == 0) || !(_flags[_cur_range_begin] == 1)) {
- _cur_range_begin = simd::find_nonzero(_flags, _cur_range_begin +
1);
+ _cur_range_begin = simd::find_one(_flags, _cur_range_begin + 1);
if (_cur_range_begin >= _end) {
return false;
}
diff --git a/be/src/vec/core/sort_cursor.h b/be/src/vec/core/sort_cursor.h
index d6c8613bb1..b13316fe40 100644
--- a/be/src/vec/core/sort_cursor.h
+++ b/be/src/vec/core/sort_cursor.h
@@ -21,22 +21,133 @@
#pragma once
#include "vec/columns/column.h"
-#include "vec/columns/column_string.h"
-#include "vec/common/assert_cast.h"
-#include "vec/common/typeid_cast.h"
#include "vec/core/block.h"
-#include "vec/core/column_numbers.h"
#include "vec/core/sort_description.h"
#include "vec/exprs/vexpr_context.h"
-#include "vec/runtime/vdata_stream_recvr.h"
namespace doris::vectorized {
+struct HeapSortCursorBlockView {
+public:
+ Block block;
+ ColumnRawPtrs sort_columns;
+ SortDescription& desc;
+
+ HeapSortCursorBlockView(Block&& cur_block, SortDescription& sort_desc)
+ : block(cur_block), desc(sort_desc) {
+ _reset();
+ }
+
+ void filter_block(IColumn::Filter& filter) {
+ Block::filter_block_internal(&block, filter, block.columns());
+ _reset();
+ }
+
+private:
+ void _reset() {
+ sort_columns.clear();
+ auto columns = block.get_columns();
+ for (size_t j = 0, size = desc.size(); j < size; ++j) {
+ auto& column_desc = desc[j];
+ size_t column_number = !column_desc.column_name.empty()
+ ?
block.get_position_by_name(column_desc.column_name)
+ : column_desc.column_number;
+ sort_columns.push_back(columns[column_number].get());
+ }
+ }
+};
+
+// Use `SharedHeapSortCursorBlockView` for `HeapSortCursorBlockView` instead
of shared_ptr because there will be no
+// concurrent operation for `HeapSortCursorBlockView` and we don't need the
lock inside shared_ptr
+class SharedHeapSortCursorBlockView {
+public:
+ SharedHeapSortCursorBlockView(HeapSortCursorBlockView&& reference)
+ : _ref_count(0), _reference(std::move(reference)) {}
+ SharedHeapSortCursorBlockView(const SharedHeapSortCursorBlockView&) =
delete;
+ void unref() noexcept {
+ DCHECK_GT(_ref_count, 0);
+ _ref_count--;
+ if (_ref_count == 0) {
+ delete this;
+ }
+ }
+ void ref() noexcept { _ref_count++; }
+
+ HeapSortCursorBlockView& value() { return _reference; }
+
+private:
+ ~SharedHeapSortCursorBlockView() noexcept = default;
+ int _ref_count;
+ HeapSortCursorBlockView _reference;
+};
+
+struct HeapSortCursorImpl {
+public:
+ HeapSortCursorImpl(int row_id, SharedHeapSortCursorBlockView* block_view)
+ : _row_id(row_id), _block_view(block_view) {
+ block_view->ref();
+ }
+
+ HeapSortCursorImpl(const HeapSortCursorImpl& other) {
+ _row_id = other._row_id;
+ _block_view = other._block_view;
+ _block_view->ref();
+ }
+
+ HeapSortCursorImpl(HeapSortCursorImpl&& other) {
+ _row_id = other._row_id;
+ _block_view = other._block_view;
+ other._block_view = nullptr;
+ }
+
+ HeapSortCursorImpl& operator=(HeapSortCursorImpl&& other) {
+ std::swap(_row_id, other._row_id);
+ std::swap(_block_view, other._block_view);
+ return *this;
+ }
+
+ ~HeapSortCursorImpl() {
+ if (_block_view) {
+ _block_view->unref();
+ }
+ };
+
+ const size_t row_id() const { return _row_id; }
+
+ const ColumnRawPtrs& sort_columns() const { return
_block_view->value().sort_columns; }
+
+ const Block* block() const { return &_block_view->value().block; }
+
+ const SortDescription& sort_desc() const { return
_block_view->value().desc; }
+
+ bool operator<(const HeapSortCursorImpl& rhs) const {
+ for (size_t i = 0; i < sort_desc().size(); ++i) {
+ int direction = sort_desc()[i].direction;
+ int nulls_direction = sort_desc()[i].nulls_direction;
+ int res = direction * sort_columns()[i]->compare_at(row_id(),
rhs.row_id(),
+
*(rhs.sort_columns()[i]),
+
nulls_direction);
+ // ASC: direction == 1. If bigger, res > 0. So we return true.
+ if (res < 0) {
+ return true;
+ }
+ if (res > 0) {
+ return false;
+ }
+ }
+ return false;
+ }
+
+private:
+ size_t _row_id;
+ SharedHeapSortCursorBlockView* _block_view;
+};
+
/** Cursor allows to compare rows in different blocks (and parts).
* Cursor moves inside single block.
* It is used in priority queue.
*/
-struct SortCursorImpl {
+struct MergeSortCursorImpl {
ColumnRawPtrs all_columns;
ColumnRawPtrs sort_columns;
SortDescription desc;
@@ -44,20 +155,21 @@ struct SortCursorImpl {
size_t pos = 0;
size_t rows = 0;
- SortCursorImpl() = default;
- virtual ~SortCursorImpl() = default;
+ MergeSortCursorImpl() = default;
+ virtual ~MergeSortCursorImpl() = default;
- SortCursorImpl(const Block& block, const SortDescription& desc_)
+ MergeSortCursorImpl(const Block& block, const SortDescription& desc_)
: desc(desc_), sort_columns_size(desc.size()) {
reset(block);
}
- SortCursorImpl(const Columns& columns, const SortDescription& desc_)
+ MergeSortCursorImpl(const Columns& columns, const SortDescription& desc_)
: desc(desc_), sort_columns_size(desc.size()) {
for (auto& column_desc : desc) {
if (!column_desc.column_name.empty()) {
- LOG(FATAL) << "SortDesctiption should contain column position
if SortCursor was "
- "used without header.";
+ LOG(FATAL)
+ << "SortDesctiption should contain column position if
MergeSortCursor was "
+ "used without header.";
}
}
reset(columns, {});
@@ -101,7 +213,7 @@ struct SortCursorImpl {
using BlockSupplier = std::function<Status(Block**)>;
-struct ReceiveQueueSortCursorImpl : public SortCursorImpl {
+struct ReceiveQueueSortCursorImpl : public MergeSortCursorImpl {
ReceiveQueueSortCursorImpl(const BlockSupplier& block_supplier,
const std::vector<VExprContext*>& ordering_expr,
const std::vector<bool>& is_asc_order,
@@ -123,7 +235,7 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl {
for (int i = 0; i < desc.size(); ++i) {
_ordering_expr[i]->execute(_block_ptr, &desc[i].column_number);
}
- SortCursorImpl::reset(*_block_ptr);
+ MergeSortCursorImpl::reset(*_block_ptr);
return true;
}
_block_ptr = nullptr;
@@ -150,14 +262,14 @@ struct ReceiveQueueSortCursorImpl : public SortCursorImpl
{
};
/// For easy copying.
-struct SortCursor {
- SortCursorImpl* impl;
+struct MergeSortCursor {
+ MergeSortCursorImpl* impl;
- SortCursor(SortCursorImpl* impl_) : impl(impl_) {}
- SortCursorImpl* operator->() const { return impl; }
+ MergeSortCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+ MergeSortCursorImpl* operator->() const { return impl; }
/// The specified row of this cursor is greater than the specified row of
another cursor.
- int8_t greater_at(const SortCursor& rhs, size_t lhs_pos, size_t rhs_pos)
const {
+ int8_t greater_at(const MergeSortCursor& rhs, size_t lhs_pos, size_t
rhs_pos) const {
for (size_t i = 0; i < impl->sort_columns_size; ++i) {
int direction = impl->desc[i].direction;
int nulls_direction = impl->desc[i].nulls_direction;
@@ -175,7 +287,7 @@ struct SortCursor {
}
/// Checks that all rows in the current block of this cursor are less than
or equal to all the rows of the current block of another cursor.
- bool totally_less(const SortCursor& rhs) const {
+ bool totally_less(const MergeSortCursor& rhs) const {
if (impl->rows == 0 || rhs.impl->rows == 0) {
return false;
}
@@ -184,23 +296,23 @@ struct SortCursor {
return greater_at(rhs, impl->rows - 1, 0) == -1;
}
- bool greater(const SortCursor& rhs) const {
+ bool greater(const MergeSortCursor& rhs) const {
return !impl->empty() && greater_at(rhs, impl->pos, rhs.impl->pos) > 0;
}
/// Inverted so that the priority queue elements are removed in ascending
order.
- bool operator<(const SortCursor& rhs) const { return greater(rhs); }
+ bool operator<(const MergeSortCursor& rhs) const { return greater(rhs); }
};
/// For easy copying.
-struct SortBlockCursor {
- SortCursorImpl* impl;
+struct MergeSortBlockCursor {
+ MergeSortCursorImpl* impl;
- SortBlockCursor(SortCursorImpl* impl_) : impl(impl_) {}
- SortCursorImpl* operator->() const { return impl; }
+ MergeSortBlockCursor(MergeSortCursorImpl* impl_) : impl(impl_) {}
+ MergeSortCursorImpl* operator->() const { return impl; }
/// The specified row of this cursor is greater than the specified row of
another cursor.
- int8_t less_at(const SortBlockCursor& rhs, int rows) const {
+ int8_t less_at(const MergeSortBlockCursor& rhs, int rows) const {
for (size_t i = 0; i < impl->sort_columns_size; ++i) {
int direction = impl->desc[i].direction;
int nulls_direction = impl->desc[i].nulls_direction;
@@ -218,7 +330,7 @@ struct SortBlockCursor {
}
/// Checks that all rows in the current block of this cursor are less than
or equal to all the rows of the current block of another cursor.
- bool totally_greater(const SortBlockCursor& rhs) const {
+ bool totally_greater(const MergeSortBlockCursor& rhs) const {
if (impl->rows == 0 || rhs.impl->rows == 0) {
return false;
}
@@ -228,7 +340,9 @@ struct SortBlockCursor {
}
/// Inverted so that the priority queue elements are removed in ascending
order.
- bool operator<(const SortBlockCursor& rhs) const { return less_at(rhs,
impl->rows - 1) == 1; }
+ bool operator<(const MergeSortBlockCursor& rhs) const {
+ return less_at(rhs, impl->rows - 1) == 1;
+ }
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index c090efa961..54323c561c 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -17,10 +17,13 @@
#include "vec/exec/vsort_node.h"
+#include "common/config.h"
#include "exec/sort_exec_exprs.h"
#include "runtime/row_batch.h"
#include "runtime/runtime_state.h"
#include "util/debug_util.h"
+#include "vec/common/sort/heap_sorter.h"
+#include "vec/common/sort/topn_sorter.h"
#include "vec/core/sort_block.h"
#include "vec/utils/util.hpp"
@@ -35,24 +38,23 @@ Status VSortNode::init(const TPlanNode& tnode,
RuntimeState* state) {
RETURN_IF_ERROR(_vsort_exec_exprs.init(tnode.sort_node.sort_info, _pool));
_is_asc_order = tnode.sort_node.sort_info.is_asc_order;
_nulls_first = tnode.sort_node.sort_info.nulls_first;
- bool has_string_slot = false;
- for (const auto& tuple_desc : child(0)->row_desc().tuple_descriptors()) {
- for (const auto& slot : tuple_desc->slots()) {
- if (slot->type().is_string_type()) {
- has_string_slot = true;
- break;
- }
- }
- if (has_string_slot) {
- break;
- }
- }
- if (has_string_slot && _limit > 0 && _limit <
ACCUMULATED_PARTIAL_SORT_THRESHOLD) {
- _sorter.reset(new TopNSorter(_sort_description, _vsort_exec_exprs,
_limit, _offset, _pool,
- _is_asc_order, _nulls_first,
child(0)->row_desc()));
+ const auto& row_desc = child(0)->row_desc();
+ // If `limit` is smaller than HEAP_SORT_THRESHOLD, we consider using heap
sort in priority.
+ // To do heap sorting, each income block will be filtered by heap-top row.
There will be some
+ // `memcpy` operations. To ensure heap sort will not incur performance
fallback, we should
+ // exclude cases which incoming blocks has string column which is
sensitive to operations like
+ // `filter` and `memcpy`
+ if (_limit > 0 && _limit + _offset < HeapSorter::HEAP_SORT_THRESHOLD &&
+ !row_desc.has_varlen_slots()) {
+ _sorter.reset(new HeapSorter(_vsort_exec_exprs, _limit, _offset,
_pool, _is_asc_order,
+ _nulls_first, row_desc));
+ } else if (_limit > 0 && row_desc.has_varlen_slots() && _limit > 0 &&
+ _limit + _offset < TopNSorter::TOPN_SORT_THRESHOLD) {
+ _sorter.reset(new TopNSorter(_vsort_exec_exprs, _limit, _offset,
_pool, _is_asc_order,
+ _nulls_first, row_desc));
} else {
- _sorter.reset(new FullSorter(_sort_description, _vsort_exec_exprs,
_limit, _offset, _pool,
- _is_asc_order, _nulls_first,
child(0)->row_desc()));
+ _sorter.reset(new FullSorter(_vsort_exec_exprs, _limit, _offset,
_pool, _is_asc_order,
+ _nulls_first, row_desc));
}
_sorter->init_profile(_runtime_profile.get());
diff --git a/be/src/vec/exec/vsort_node.h b/be/src/vec/exec/vsort_node.h
index e7cb12b437..ff640b6851 100644
--- a/be/src/vec/exec/vsort_node.h
+++ b/be/src/vec/exec/vsort_node.h
@@ -63,8 +63,6 @@ private:
std::vector<bool> _is_asc_order;
std::vector<bool> _nulls_first;
- SortDescription _sort_description;
-
std::unique_ptr<Sorter> _sorter;
static constexpr size_t ACCUMULATED_PARTIAL_SORT_THRESHOLD = 256;
diff --git a/be/src/vec/runtime/vsorted_run_merger.cpp
b/be/src/vec/runtime/vsorted_run_merger.cpp
index f7bd965299..937d743780 100644
--- a/be/src/vec/runtime/vsorted_run_merger.cpp
+++ b/be/src/vec/runtime/vsorted_run_merger.cpp
@@ -50,7 +50,7 @@ Status VSortedRunMerger::prepare(const vector<BlockSupplier>&
input_runs, bool p
}
for (auto& _cursor : _cursors) {
- if (!_cursor._is_eof) _priority_queue.push(SortCursor(&_cursor));
+ if (!_cursor._is_eof) _priority_queue.push(MergeSortCursor(&_cursor));
}
for (const auto& cursor : _cursors) {
@@ -142,7 +142,7 @@ Status VSortedRunMerger::get_next(Block* output_block,
bool* eos) {
return Status::OK();
}
-void VSortedRunMerger::next_heap(SortCursor& current) {
+void VSortedRunMerger::next_heap(MergeSortCursor& current) {
if (!current->isLast()) {
current->next();
_priority_queue.push(current);
@@ -151,7 +151,7 @@ void VSortedRunMerger::next_heap(SortCursor& current) {
}
}
-inline bool VSortedRunMerger::has_next_block(doris::vectorized::SortCursor&
current) {
+inline bool
VSortedRunMerger::has_next_block(doris::vectorized::MergeSortCursor& current) {
ScopedTimer<MonotonicStopWatch> timer(_get_next_block_timer);
return current->has_next_block();
}
diff --git a/be/src/vec/runtime/vsorted_run_merger.h
b/be/src/vec/runtime/vsorted_run_merger.h
index 05c463fec3..e374f2cdc0 100644
--- a/be/src/vec/runtime/vsorted_run_merger.h
+++ b/be/src/vec/runtime/vsorted_run_merger.h
@@ -72,7 +72,7 @@ protected:
size_t _offset = 0;
std::vector<ReceiveQueueSortCursorImpl> _cursors;
- std::priority_queue<SortCursor> _priority_queue;
+ std::priority_queue<MergeSortCursor> _priority_queue;
Block _empty_block;
@@ -83,8 +83,8 @@ protected:
RuntimeProfile::Counter* _get_next_block_timer;
private:
- void next_heap(SortCursor& current);
- bool has_next_block(SortCursor& current);
+ void next_heap(MergeSortCursor& current);
+ bool has_next_block(MergeSortCursor& current);
};
} // namespace vectorized
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]