This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 705989d [improvement](VHashJoin) add probe timer (#8233)
705989d is described below
commit 705989d23916ce115b6ed269221f7be377b74a24
Author: awakeljw <[email protected]>
AuthorDate: Sun Mar 13 20:54:44 2022 +0800
[improvement](VHashJoin) add probe timer (#8233)
---
be/src/vec/exec/join/vhash_join_node.cpp | 217 ++++++++++++++++++-------------
be/src/vec/exec/join/vhash_join_node.h | 3 +
2 files changed, 127 insertions(+), 93 deletions(-)
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index c33bcb2..a1af769 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -166,8 +166,56 @@ struct ProcessHashTableProbe {
_items_counts(join_node->_items_counts),
_build_block_offsets(join_node->_build_block_offsets),
_build_block_rows(join_node->_build_block_rows),
- _rows_returned_counter(join_node->_rows_returned_counter) {}
+ _rows_returned_counter(join_node->_rows_returned_counter),
+ _search_hashtable_timer(join_node->_search_hashtable_timer),
+ _build_side_output_timer(join_node->_build_side_output_timer),
+ _probe_side_output_timer(join_node->_probe_side_output_timer) {}
+
+ // output build side result column
+ void build_side_output_column(MutableColumns& mcol, int column_offset, int
column_length, int size) {
+ constexpr auto is_semi_anti_join = JoinOpType::value ==
TJoinOp::RIGHT_ANTI_JOIN ||
+ JoinOpType::value ==
TJoinOp::RIGHT_SEMI_JOIN ||
+ JoinOpType::value ==
TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType::value ==
TJoinOp::LEFT_SEMI_JOIN;
+ constexpr auto probe_all = JoinOpType::value ==
TJoinOp::LEFT_OUTER_JOIN ||
+ JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN;
+
+ if constexpr (!is_semi_anti_join) {
+ if (_build_blocks.size() == 1) {
+ for (int i = 0; i < column_length; i++) {
+ auto& column = *_build_blocks[0].get_by_position(i).column;
+ mcol[i + column_offset]->insert_indices_from(column,
+ _build_block_rows.data(), _build_block_rows.data()
+ size);
+ }
+ } else {
+ for (int i = 0; i < column_length; i++) {
+ for (int j = 0; j < size; j++) {
+ if constexpr (probe_all) {
+ if (_build_block_offsets[j] == -1) {
+ DCHECK(mcol[i + column_offset]->is_nullable());
+ assert_cast<ColumnNullable *>(mcol[i +
column_offset].get())->insert_join_null_data();
+ } else {
+ auto& column =
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
+ mcol[i + column_offset]->insert_from(column,
_build_block_rows[j]);
+ }
+ } else {
+ auto& column =
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
+ mcol[i + column_offset]->insert_from(column,
_build_block_rows[j]);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // output probe side result column
+ void probe_side_output_column(MutableColumns& mcol, int column_length, int
size) {
+ for (int i = 0; i < column_length; ++i) {
+ auto& column = _probe_block.get_by_position(i).column;
+ column->replicate(&_items_counts[0], size, *mcol[i]);
+ }
+ }
// Only process the join with no other join conjunt, because of no other
join conjunt
// the output block struct is same with mutable block. we can do more opt
on it and simplify
// the logic of probe
@@ -198,116 +246,93 @@ struct ProcessHashTableProbe {
constexpr auto is_right_semi_anti_join = JoinOpType::value ==
TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType::value ==
TJoinOp::RIGHT_SEMI_JOIN;
- constexpr auto is_semi_anti_join = is_right_semi_anti_join ||
- JoinOpType::value ==
TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType::value ==
TJoinOp::LEFT_SEMI_JOIN;
-
constexpr auto probe_all = JoinOpType::value ==
TJoinOp::LEFT_OUTER_JOIN ||
JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN;
- for (; _probe_index < _probe_rows;) {
- if constexpr (ignore_null) {
- if ((*null_map)[_probe_index]) {
- _items_counts[_probe_index++] = (uint32_t)0;
- continue;
- }
- }
- int last_offset = current_offset;
- auto find_result = (*null_map)[_probe_index]
- ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index,
- _arena))
{nullptr, false}
- :
key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena);
-
- if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
- if (!find_result.is_found()) {
- ++current_offset;
- }
- } else if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN)
{
- if (find_result.is_found()) {
- ++current_offset;
+ {
+ SCOPED_TIMER(_search_hashtable_timer);
+ for (; _probe_index < _probe_rows;) {
+ if constexpr (ignore_null) {
+ if ((*null_map)[_probe_index]) {
+ _items_counts[_probe_index++] = (uint32_t)0;
+ continue;
+ }
}
- } else {
- if (find_result.is_found()) {
- auto& mapped = find_result.get_mapped();
- // TODO: Iterators are currently considered to be a heavy
operation and have a certain impact on performance.
- // We should rethink whether to use this iterator mode in
the future. Now just opt the one row case
- if (mapped.get_row_count() == 1) {
- if constexpr (need_to_set_visited)
- mapped.visited = true;
-
- if constexpr (!is_right_semi_anti_join) {
- _build_block_offsets[current_offset] =
mapped.block_offset;
- _build_block_rows[current_offset] = mapped.row_num;
- ++current_offset;
- }
- } else {
- // prefetch is more useful while matching to multiple
rows
- if (_probe_index + 2 < _probe_rows)
- key_getter.prefetch(hash_table_ctx.hash_table,
_probe_index + 2, _arena);
+ int last_offset = current_offset;
+ auto find_result = (*null_map)[_probe_index]
+ ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, _probe_index,
+ _arena))
{nullptr, false}
+ :
key_getter.find_key(hash_table_ctx.hash_table, _probe_index, _arena);
+
+ if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
+ if (!find_result.is_found()) {
+ ++current_offset;
+ }
+ } else if constexpr (JoinOpType::value ==
TJoinOp::LEFT_SEMI_JOIN) {
+ if (find_result.is_found()) {
+ ++current_offset;
+ }
+ } else {
+ if (find_result.is_found()) {
+ auto& mapped = find_result.get_mapped();
+ // TODO: Iterators are currently considered to be a
heavy operation and have a certain impact on performance.
+ // We should rethink whether to use this iterator mode
in the future. Now just opt the one row case
+ if (mapped.get_row_count() == 1) {
+ if constexpr (need_to_set_visited)
+ mapped.visited = true;
- for (auto it = mapped.begin(); it.ok(); ++it) {
if constexpr (!is_right_semi_anti_join) {
- if (current_offset < _batch_size) {
- _build_block_offsets[current_offset] =
it->block_offset;
- _build_block_rows[current_offset] =
it->row_num;
- } else {
-
_build_block_offsets.emplace_back(it->block_offset);
-
_build_block_rows.emplace_back(it->row_num);
- }
+ _build_block_offsets[current_offset] =
mapped.block_offset;
+ _build_block_rows[current_offset] =
mapped.row_num;
++current_offset;
}
- if constexpr (need_to_set_visited)
- it->visited = true;
+ } else {
+ // prefetch is more useful while matching to
multiple rows
+ if (_probe_index + 2 < _probe_rows)
+ key_getter.prefetch(hash_table_ctx.hash_table,
_probe_index + 2, _arena);
+
+ for (auto it = mapped.begin(); it.ok(); ++it) {
+ if constexpr (!is_right_semi_anti_join) {
+ if (current_offset < _batch_size) {
+ _build_block_offsets[current_offset] =
it->block_offset;
+ _build_block_rows[current_offset] =
it->row_num;
+ } else {
+
_build_block_offsets.emplace_back(it->block_offset);
+
_build_block_rows.emplace_back(it->row_num);
+ }
+ ++current_offset;
+ }
+ if constexpr (need_to_set_visited)
+ it->visited = true;
+ }
+ }
+ } else {
+ if constexpr (probe_all) {
+ // only full outer / left outer need insert the
data of right table
+ _build_block_offsets[current_offset] = -1;
+ _build_block_rows[current_offset] = -1;
+ ++current_offset;
}
- }
- } else {
- if constexpr (probe_all) {
- // only full outer / left outer need insert the data
of right table
- _build_block_offsets[current_offset] = -1;
- _build_block_rows[current_offset] = -1;
- ++current_offset;
}
}
- }
- _items_counts[_probe_index++] = (uint32_t)(current_offset -
last_offset);
- if (current_offset >= _batch_size) {
- break;
+ _items_counts[_probe_index++] = (uint32_t)(current_offset -
last_offset);
+ if (current_offset >= _batch_size) {
+ break;
+ }
}
}
- // insert all matched build rows
- if constexpr (!is_semi_anti_join) {
- if (_build_blocks.size() == 1) {
- for (int i = 0; i < right_col_len; i++) {
- auto& column = *_build_blocks[0].get_by_position(i).column;
- mcol[i + right_col_idx]->insert_indices_from(column,
- _build_block_rows.data(), _build_block_rows.data()
+ current_offset);
- }
- } else {
- for (int i = 0; i < right_col_len; i++) {
- for (int j = 0; j < current_offset; j++) {
- if constexpr (probe_all) {
- if (_build_block_offsets[j] == -1) {
- DCHECK(mcol[i + right_col_idx]->is_nullable());
- assert_cast<ColumnNullable *>(mcol[i +
right_col_idx].get())->insert_join_null_data();
- } else {
- auto& column =
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
- mcol[i + right_col_idx]->insert_from(column,
_build_block_rows[j]);
- }
- } else {
- auto& column =
*_build_blocks[_build_block_offsets[j]].get_by_position(i).column;
- mcol[i + right_col_idx]->insert_from(column,
_build_block_rows[j]);
- }
- }
- }
- }
+ {
+ SCOPED_TIMER(_build_side_output_timer);
+ build_side_output_column(mcol, right_col_idx, right_col_len,
current_offset);
}
- for (int i = 0; i < right_col_idx; ++i) {
- auto& column = _probe_block.get_by_position(i).column;
- column->replicate(&_items_counts[0], current_offset, *mcol[i]);
+ {
+ SCOPED_TIMER(_probe_side_output_timer);
+ probe_side_output_column(mcol, right_col_idx, current_offset);
}
+
output_block->swap(mutable_block.to_block());
return Status::OK();
@@ -581,6 +606,9 @@ private:
std::vector<int>& _build_block_rows;
ProfileCounter* _rows_returned_counter;
+ ProfileCounter* _search_hashtable_timer;
+ ProfileCounter* _build_side_output_timer;
+ ProfileCounter* _probe_side_output_timer;
};
// now we only support inner join
@@ -697,6 +725,9 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_probe_next_timer = ADD_TIMER(probe_phase_profile, "ProbeFindNextTime");
_probe_expr_call_timer = ADD_TIMER(probe_phase_profile,
"ProbeExprCallTime");
_probe_rows_counter = ADD_COUNTER(probe_phase_profile, "ProbeRows",
TUnit::UNIT);
+ _search_hashtable_timer = ADD_TIMER(probe_phase_profile,
"ProbeWhenSearchHashTableTime");
+ _build_side_output_timer = ADD_TIMER(probe_phase_profile,
"ProbeWhenBuildSideOutputTime");
+ _probe_side_output_timer = ADD_TIMER(probe_phase_profile,
"ProbeWhenProbeSideOutputTime");
_push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
_push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index b50d11f..ca93aaa 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -195,6 +195,9 @@ private:
RuntimeProfile::Counter* _push_compute_timer;
RuntimeProfile::Counter* _build_rows_counter;
RuntimeProfile::Counter* _probe_rows_counter;
+ RuntimeProfile::Counter* _search_hashtable_timer;
+ RuntimeProfile::Counter* _build_side_output_timer;
+ RuntimeProfile::Counter* _probe_side_output_timer;
int64_t _hash_table_rows;
int64_t _mem_used;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]