This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch opt_perf
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/opt_perf by this push:
new 23301adea3 [Improvement](sort) Reuse memory in sort node (#12922)
23301adea3 is described below
commit 23301adea353f4d548bd9f79a81e11140998e84f
Author: Gabriel <[email protected]>
AuthorDate: Fri Sep 23 17:44:28 2022 +0800
[Improvement](sort) Reuse memory in sort node (#12922)
---
be/src/vec/common/sort/heap_sorter.cpp | 2 +-
be/src/vec/common/sort/heap_sorter.h | 2 +-
be/src/vec/common/sort/sorter.cpp | 39 +++++++++++++++---------
be/src/vec/common/sort/sorter.h | 6 ++--
be/src/vec/common/sort/topn_sorter.cpp | 22 ++++----------
be/src/vec/common/sort/topn_sorter.h | 4 +--
be/src/vec/core/sort_block.cpp | 55 ++++++++++++++++++++++++++++++++++
be/src/vec/core/sort_block.h | 3 ++
be/src/vec/exec/vsort_node.cpp | 12 +++-----
9 files changed, 100 insertions(+), 45 deletions(-)
diff --git a/be/src/vec/common/sort/heap_sorter.cpp
b/be/src/vec/common/sort/heap_sorter.cpp
index 795bd66941..6520b005a4 100644
--- a/be/src/vec/common/sort/heap_sorter.cpp
+++ b/be/src/vec/common/sort/heap_sorter.cpp
@@ -29,7 +29,7 @@ HeapSorter::HeapSorter(VSortExecExprs& vsort_exec_exprs, int
limit, int64_t offs
_topn_filter_rows(0),
_init_sort_descs(false) {}
-Status HeapSorter::append_block(Block* block, bool* mem_reuse) {
+Status HeapSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
{
SCOPED_TIMER(_materialize_timer);
diff --git a/be/src/vec/common/sort/heap_sorter.h
b/be/src/vec/common/sort/heap_sorter.h
index f725d585c2..6f644a9d92 100644
--- a/be/src/vec/common/sort/heap_sorter.h
+++ b/be/src/vec/common/sort/heap_sorter.h
@@ -63,7 +63,7 @@ public:
_materialize_timer = ADD_TIMER(runtime_profile, "MaterializeTime");
}
- Status append_block(Block* block, bool* mem_reuse) override;
+ Status append_block(Block* block) override;
Status prepare_for_read() override;
diff --git a/be/src/vec/common/sort/sorter.cpp
b/be/src/vec/common/sort/sorter.cpp
index 9b5641075d..4df6a6abc7 100644
--- a/be/src/vec/common/sort/sorter.cpp
+++ b/be/src/vec/common/sort/sorter.cpp
@@ -72,25 +72,31 @@ Status
MergeSorterState::merge_sort_read(doris::RuntimeState* state,
return Status::OK();
}
-Status Sorter::partial_sort(Block& block) {
+Status Sorter::partial_sort(Block& src_block, Block& dest_block) {
+ bool materialized = false;
if (_vsort_exec_exprs.need_materialize_tuple()) {
auto output_tuple_expr_ctxs =
_vsort_exec_exprs.sort_tuple_slot_expr_ctxs();
std::vector<int> valid_column_ids(output_tuple_expr_ctxs.size());
for (int i = 0; i < output_tuple_expr_ctxs.size(); ++i) {
- RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&block,
&valid_column_ids[i]));
+ RETURN_IF_ERROR(output_tuple_expr_ctxs[i]->execute(&src_block,
&valid_column_ids[i]));
}
Block new_block;
for (auto column_id : valid_column_ids) {
- new_block.insert(block.get_by_position(column_id));
+ new_block.insert(src_block.get_by_position(column_id));
}
- block.swap(new_block);
+ dest_block.swap(new_block);
+ materialized = true;
}
_sort_description.resize(_vsort_exec_exprs.lhs_ordering_expr_ctxs().size());
for (int i = 0; i < _sort_description.size(); i++) {
const auto& ordering_expr =
_vsort_exec_exprs.lhs_ordering_expr_ctxs()[i];
- RETURN_IF_ERROR(ordering_expr->execute(&block,
&_sort_description[i].column_number));
+ if (materialized) {
+ RETURN_IF_ERROR(ordering_expr->execute(&src_block,
&_sort_description[i].column_number));
+ } else {
+ RETURN_IF_ERROR(ordering_expr->execute(&dest_block,
&_sort_description[i].column_number));
+ }
_sort_description[i].direction = _is_asc_order[i] ? 1 : -1;
_sort_description[i].nulls_direction =
@@ -99,7 +105,11 @@ Status Sorter::partial_sort(Block& block) {
{
SCOPED_TIMER(_partial_sort_timer);
- sort_block(block, _sort_description, _offset + _limit);
+ if (materialized) {
+ sort_block(dest_block, _sort_description, _offset + _limit);
+ } else {
+ sort_block(src_block, dest_block, _sort_description, _offset +
_limit);
+ }
}
return Status::OK();
@@ -111,7 +121,7 @@ FullSorter::FullSorter(VSortExecExprs& vsort_exec_exprs,
int limit, int64_t offs
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
_state(std::unique_ptr<MergeSorterState>(new
MergeSorterState(row_desc, offset))) {}
-Status FullSorter::append_block(Block* block, bool* mem_reuse) {
+Status FullSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
{
SCOPED_TIMER(_merge_block_timer);
@@ -147,8 +157,9 @@ Status FullSorter::get_next(RuntimeState* state, Block*
block, bool* eos) {
}
Status FullSorter::_do_sort() {
- Block block = _state->unsorted_block->to_block(0);
- RETURN_IF_ERROR(partial_sort(block));
+ Block src_block = _state->unsorted_block->to_block(0);
+ Block desc_block = src_block.clone_without_columns();
+ RETURN_IF_ERROR(partial_sort(src_block, desc_block));
// dispose TOP-N logic
if (_limit != -1) {
// Here is a little opt to reduce the mem uasge, we build a max heap
@@ -156,21 +167,21 @@ Status FullSorter::_do_sort() {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows < _limit) {
- _state->sorted_blocks.emplace_back(std::move(block));
- _state->num_rows += block.rows();
+ _state->sorted_blocks.emplace_back(std::move(desc_block));
+ _state->num_rows += desc_block.rows();
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->sorted_blocks.back(),
_sort_description)));
} else {
MergeSortBlockCursor block_cursor(
- _pool->add(new MergeSortCursorImpl(block,
_sort_description)));
+ _pool->add(new MergeSortCursorImpl(desc_block,
_sort_description)));
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
- _state->sorted_blocks.emplace_back(std::move(block));
+ _state->sorted_blocks.emplace_back(std::move(desc_block));
_block_priority_queue.push(block_cursor);
}
}
} else {
// dispose normal sort logic
- _state->sorted_blocks.emplace_back(std::move(block));
+ _state->sorted_blocks.emplace_back(std::move(desc_block));
}
_state->reset_block();
return Status::OK();
diff --git a/be/src/vec/common/sort/sorter.h b/be/src/vec/common/sort/sorter.h
index 2e56f8012a..3d6d3d8726 100644
--- a/be/src/vec/common/sort/sorter.h
+++ b/be/src/vec/common/sort/sorter.h
@@ -76,14 +76,14 @@ public:
_merge_block_timer = ADD_TIMER(runtime_profile, "MergeBlockTime");
};
- virtual Status append_block(Block* block, bool* mem_reuse) = 0;
+ virtual Status append_block(Block* block) = 0;
virtual Status prepare_for_read() = 0;
virtual Status get_next(RuntimeState* state, Block* block, bool* eos) = 0;
protected:
- Status partial_sort(Block& block);
+ Status partial_sort(Block& src_block, Block& dest_block);
SortDescription _sort_description;
VSortExecExprs& _vsort_exec_exprs;
@@ -107,7 +107,7 @@ public:
~FullSorter() override = default;
- Status append_block(Block* block, bool* mem_reuse) override;
+ Status append_block(Block* block) override;
Status prepare_for_read() override;
diff --git a/be/src/vec/common/sort/topn_sorter.cpp
b/be/src/vec/common/sort/topn_sorter.cpp
index 4ed7af6d04..6ded187cfc 100644
--- a/be/src/vec/common/sort/topn_sorter.cpp
+++ b/be/src/vec/common/sort/topn_sorter.cpp
@@ -25,9 +25,9 @@ TopNSorter::TopNSorter(VSortExecExprs& vsort_exec_exprs, int
limit, int64_t offs
: Sorter(vsort_exec_exprs, limit, offset, pool, is_asc_order,
nulls_first),
_state(std::unique_ptr<MergeSorterState>(new
MergeSorterState(row_desc, offset))) {}
-Status TopNSorter::append_block(Block* block, bool* mem_reuse) {
+Status TopNSorter::append_block(Block* block) {
DCHECK(block->rows() > 0);
- RETURN_IF_ERROR(_do_sort(block, mem_reuse));
+ RETURN_IF_ERROR(_do_sort(block));
return Status::OK();
}
@@ -51,9 +51,9 @@ Status TopNSorter::get_next(RuntimeState* state, Block*
block, bool* eos) {
return Status::OK();
}
-Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
- *mem_reuse = false;
- RETURN_IF_ERROR(partial_sort(*block));
+Status TopNSorter::_do_sort(Block* block) {
+ Block sorted_block = block->clone_without_columns();
+ RETURN_IF_ERROR(partial_sort(*block, sorted_block));
// dispose TOP-N logic
if (_limit != -1) {
// Here is a little opt to reduce the mem uasge, we build a max heap
@@ -61,30 +61,20 @@ Status TopNSorter::_do_sort(Block* block, bool* mem_reuse) {
// if one block totally greater the heap top of _block_priority_queue
// we can throw the block data directly.
if (_state->num_rows < _limit) {
- Block sorted_block;
- sorted_block.swap(*block);
_state->sorted_blocks.emplace_back(std::move(sorted_block));
_state->num_rows += sorted_block.rows();
_block_priority_queue.emplace(_pool->add(
new MergeSortCursorImpl(_state->sorted_blocks.back(),
_sort_description)));
} else {
- Block sorted_block;
- sorted_block.swap(*block);
MergeSortBlockCursor block_cursor(
_pool->add(new MergeSortCursorImpl(sorted_block,
_sort_description)));
if (!block_cursor.totally_greater(_block_priority_queue.top())) {
_state->sorted_blocks.emplace_back(std::move(sorted_block));
_block_priority_queue.push(block_cursor);
- } else {
- *mem_reuse = true;
- block->clear_column_data();
}
}
} else {
- Block sorted_block;
- sorted_block.swap(*block);
- // dispose normal sort logic
- _state->sorted_blocks.emplace_back(std::move(sorted_block));
+ return Status::InternalError("Should not reach TopN sorter for full
sort query");
}
return Status::OK();
}
diff --git a/be/src/vec/common/sort/topn_sorter.h
b/be/src/vec/common/sort/topn_sorter.h
index 675442f5a1..0cb4da31c2 100644
--- a/be/src/vec/common/sort/topn_sorter.h
+++ b/be/src/vec/common/sort/topn_sorter.h
@@ -30,7 +30,7 @@ public:
~TopNSorter() override = default;
- Status append_block(Block* block, bool* mem_reuse) override;
+ Status append_block(Block* block) override;
Status prepare_for_read() override;
@@ -39,7 +39,7 @@ public:
static constexpr size_t TOPN_SORT_THRESHOLD = 256;
private:
- Status _do_sort(Block* block, bool* mem_reuse);
+ Status _do_sort(Block* block);
std::unique_ptr<MergeSorterState> _state;
};
diff --git a/be/src/vec/core/sort_block.cpp b/be/src/vec/core/sort_block.cpp
index 657fd58d23..1dcab87d62 100644
--- a/be/src/vec/core/sort_block.cpp
+++ b/be/src/vec/core/sort_block.cpp
@@ -114,6 +114,61 @@ void sort_block(Block& block, const SortDescription&
description, UInt64 limit)
}
}
+void sort_block(Block& src_block, Block& dest_block, const SortDescription&
description,
+ UInt64 limit) {
+ if (!src_block) {
+ return;
+ }
+
+ /// If only one column to sort by
+ if (description.size() == 1) {
+ bool reverse = description[0].direction == -1;
+
+ const IColumn* column =
+ !description[0].column_name.empty()
+ ?
src_block.get_by_name(description[0].column_name).column.get()
+ :
src_block.safe_get_by_position(description[0].column_number).column.get();
+
+ IColumn::Permutation perm;
+ column->get_permutation(reverse, limit,
description[0].nulls_direction, perm);
+
+ size_t columns = src_block.columns();
+ for (size_t i = 0; i < columns; ++i) {
+ dest_block.replace_by_position(
+ i, src_block.get_by_position(i).column->permute(perm,
limit));
+ }
+ } else {
+ size_t size = src_block.rows();
+ IColumn::Permutation perm(size);
+ for (size_t i = 0; i < size; ++i) {
+ perm[i] = i;
+ }
+
+ if (limit >= size) {
+ limit = 0;
+ }
+
+ ColumnsWithSortDescriptions columns_with_sort_desc =
+ get_columns_with_sort_description(src_block, description);
+ {
+ EqualFlags flags(size, 1);
+ EqualRange range {0, size};
+
+ for (size_t i = 0; i < columns_with_sort_desc.size(); i++) {
+ ColumnSorter sorter(columns_with_sort_desc[i], limit);
+ sorter.operator()(flags, perm, range, i ==
columns_with_sort_desc.size() - 1);
+ }
+ }
+
+ size_t columns = src_block.columns();
+ for (size_t i = 0; i < columns; ++i) {
+ dest_block.replace_by_position(
+ i, src_block.get_by_position(i).column->permute(perm,
limit));
+ }
+ }
+ src_block.clear_column_data();
+}
+
void stable_get_permutation(const Block& block, const SortDescription&
description,
IColumn::Permutation& out_permutation) {
if (!block) {
diff --git a/be/src/vec/core/sort_block.h b/be/src/vec/core/sort_block.h
index cc791881c1..f20b0311a7 100644
--- a/be/src/vec/core/sort_block.h
+++ b/be/src/vec/core/sort_block.h
@@ -30,6 +30,9 @@ namespace doris::vectorized {
/// Sort one block by `description`. If limit != 0, then the partial sort of
the first `limit` rows is produced.
void sort_block(Block& block, const SortDescription& description, UInt64 limit
= 0);
+void sort_block(Block& src_block, Block& dest_block, const SortDescription&
description,
+ UInt64 limit = 0);
+
/** Used only in StorageMergeTree to sort the data with INSERT.
* Sorting is stable. This is important for keeping the order of rows in the
CollapsingMergeTree engine
* - because based on the order of rows it is determined whether to delete
or leave groups of rows when collapsing.
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 54323c561c..1764a7f124 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -84,17 +84,13 @@ Status VSortNode::open(RuntimeState* state) {
// The child has been opened and the sorter created. Sort the input.
// The final merge is done on-demand as rows are requested in get_next().
bool eos = false;
- bool mem_reuse = false;
- std::unique_ptr<Block> upstream_block;
+ Block upstream_block;
do {
- if (!mem_reuse) {
- upstream_block.reset(new Block());
- }
RETURN_IF_ERROR_AND_CHECK_SPAN(
- child(0)->get_next_after_projects(state, upstream_block.get(),
&eos),
+ child(0)->get_next_after_projects(state, &upstream_block,
&eos),
child(0)->get_next_span(), eos);
- if (upstream_block->rows() != 0) {
- RETURN_IF_ERROR(_sorter->append_block(upstream_block.get(),
&mem_reuse));
+ if (upstream_block.rows() != 0) {
+ RETURN_IF_ERROR(_sorter->append_block(&upstream_block));
RETURN_IF_CANCELLED(state);
RETURN_IF_ERROR(state->check_query_state("vsort, while sorting
input."));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]