This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5a1c7f6314 [improvement](analytic) improve memory counter (#14890)
5a1c7f6314 is described below
commit 5a1c7f6314fef589bfc078ca4253a19d2fff87f9
Author: TengJianPing <[email protected]>
AuthorDate: Fri Dec 9 14:13:17 2022 +0800
[improvement](analytic) improve memory counter (#14890)
---
be/src/vec/exec/vanalytic_eval_node.cpp | 32 +++++++++++++++++---------------
be/src/vec/exec/vanalytic_eval_node.h | 9 ++++-----
2 files changed, 21 insertions(+), 20 deletions(-)
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp
b/be/src/vec/exec/vanalytic_eval_node.cpp
index ed1ac17767..5fa25cb1c6 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -287,8 +287,8 @@ Status
VAnalyticEvalNode::_get_next_for_partition(RuntimeState* state, Block* bl
SCOPED_CONSUME_MEM_TRACKER(mem_tracker_growh());
size_t current_block_rows = _input_blocks[_output_block_index].rows();
if (next_partition) {
- _executor.execute(_partition_by_start, _partition_by_end,
_partition_by_start,
- _partition_by_end);
+ _executor.execute(_partition_by_start.pos, _partition_by_end.pos,
+ _partition_by_start.pos, _partition_by_end.pos);
}
_executor.insert_result(current_block_rows);
if (_window_end_position == current_block_rows) {
@@ -312,7 +312,8 @@ Status VAnalyticEvalNode::_get_next_for_range(RuntimeState*
state, Block* block,
_window_end_position < current_block_rows) {
if (_current_row_position >= _order_by_end.pos) {
_update_order_by_range();
- _executor.execute(_order_by_start, _order_by_end,
_order_by_start, _order_by_end);
+ _executor.execute(_order_by_start.pos, _order_by_end.pos,
_order_by_start.pos,
+ _order_by_end.pos);
}
_executor.insert_result(current_block_rows);
}
@@ -335,25 +336,26 @@ Status
VAnalyticEvalNode::_get_next_for_rows(RuntimeState* state, Block* block,
size_t current_block_rows = _input_blocks[_output_block_index].rows();
while (_current_row_position < _partition_by_end.pos &&
_window_end_position < current_block_rows) {
- BlockRowPos range_start, range_end;
+ int64_t range_start, range_end;
if (!_window.__isset.window_start &&
_window.window_end.type ==
TAnalyticWindowBoundaryType::
CURRENT_ROW) { //[preceding,
current_row],[current_row, following]
- range_start.pos = _current_row_position;
- range_end.pos = _current_row_position +
- 1; //going on calculate,add up data, no need
to reset state
+ range_start = _current_row_position;
+ range_end = _current_row_position +
+ 1; //going on calculate,add up data, no need to
reset state
} else {
_reset_agg_status();
if (!_window.__isset
.window_start) { //[preceding, offset]
--unbound: [preceding, following]
- range_start.pos = _partition_by_start.pos;
+ range_start = _partition_by_start.pos;
} else {
- range_start.pos = _current_row_position +
_rows_start_offset;
+ range_start = _current_row_position + _rows_start_offset;
}
- range_end.pos = _current_row_position + _rows_end_offset + 1;
+ range_end = _current_row_position + _rows_end_offset + 1;
}
- _executor.execute(_partition_by_start, _partition_by_end,
range_start, range_end);
+ _executor.execute(_partition_by_start.pos, _partition_by_end.pos,
range_start,
+ range_end);
_executor.insert_result(current_block_rows);
}
if (_window_end_position == current_block_rows) {
@@ -595,6 +597,7 @@ Status VAnalyticEvalNode::_output_current_block(Block*
block) {
block->swap(std::move(_input_blocks[_output_block_index]));
_blocks_memory_usage->add(-block->allocated_bytes());
+ mem_tracker_held()->consume(-block->allocated_bytes());
if (_origin_cols.size() < block->columns()) {
block->erase_not_in(_origin_cols);
}
@@ -618,16 +621,15 @@ Status VAnalyticEvalNode::_output_current_block(Block*
block) {
//now is execute for lead/lag row_number/rank/dense_rank/ntile functions
//sum min max count avg first_value last_value functions
-void VAnalyticEvalNode::_execute_for_win_func(BlockRowPos partition_start,
- BlockRowPos partition_end,
BlockRowPos frame_start,
- BlockRowPos frame_end) {
+void VAnalyticEvalNode::_execute_for_win_func(int64_t partition_start, int64_t
partition_end,
+ int64_t frame_start, int64_t
frame_end) {
for (size_t i = 0; i < _agg_functions_size; ++i) {
std::vector<const IColumn*> _agg_columns;
for (int j = 0; j < _agg_intput_columns[i].size(); ++j) {
_agg_columns.push_back(_agg_intput_columns[i][j].get());
}
_agg_functions[i]->function()->add_range_single_place(
- partition_start.pos, partition_end.pos, frame_start.pos,
frame_end.pos,
+ partition_start, partition_end, frame_start, frame_end,
_fn_place_ptr + _offsets_of_aggregate_states[i],
_agg_columns.data(), nullptr);
}
}
diff --git a/be/src/vec/exec/vanalytic_eval_node.h
b/be/src/vec/exec/vanalytic_eval_node.h
index c39340e94d..a6824a108a 100644
--- a/be/src/vec/exec/vanalytic_eval_node.h
+++ b/be/src/vec/exec/vanalytic_eval_node.h
@@ -58,8 +58,8 @@ private:
Status _get_next_for_range(RuntimeState* state, Block* block, bool* eos);
Status _get_next_for_partition(RuntimeState* state, Block* block, bool*
eos);
- void _execute_for_win_func(BlockRowPos partition_start, BlockRowPos
partition_end,
- BlockRowPos frame_start, BlockRowPos frame_end);
+ void _execute_for_win_func(int64_t partition_start, int64_t partition_end,
int64_t frame_start,
+ int64_t frame_end);
Status _reset_agg_status();
Status _init_result_columns();
@@ -80,9 +80,8 @@ private:
bool whether_need_next_partition(BlockRowPos found_partition_end);
std::string debug_window_bound_string(TAnalyticWindowBoundary b);
- using vectorized_execute =
- std::function<void(BlockRowPos peer_group_start, BlockRowPos
peer_group_end,
- BlockRowPos frame_start, BlockRowPos
frame_end)>;
+ using vectorized_execute = std::function<void(int64_t peer_group_start,
int64_t peer_group_end,
+ int64_t frame_start, int64_t
frame_end)>;
using vectorized_get_next = std::function<Status(RuntimeState* state,
Block* block, bool* eos)>;
using vectorized_get_result = std::function<void(int64_t
current_block_rows)>;
using vectorized_closer = std::function<void()>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]