This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new cdf5f0fe687 [fix](pipelineX) mark join column should be nullable
(#25275)
cdf5f0fe687 is described below
commit cdf5f0fe687091087402a4e36c253548e21ac541
Author: Mryange <[email protected]>
AuthorDate: Wed Oct 11 11:35:43 2023 +0800
[fix](pipelineX) mark join column should be nullable (#25275)
---
be/src/pipeline/exec/hashjoin_build_sink.cpp | 11 +++++----
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 24 +++++++++++--------
be/src/pipeline/exec/join_build_sink_operator.h | 2 --
be/src/pipeline/exec/join_probe_operator.cpp | 8 +++----
be/src/pipeline/exec/join_probe_operator.h | 1 +
.../exec/nested_loop_join_probe_operator.cpp | 27 ++++++++--------------
be/src/pipeline/pipeline_x/dependency.h | 1 +
be/src/vec/exec/join/vhash_join_node.cpp | 3 ++-
8 files changed, 39 insertions(+), 38 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 8dd84dfd27b..3b0342a926b 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -139,7 +139,8 @@ Status HashJoinBuildSinkLocalState::open(RuntimeState*
state) {
void HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->short_circuit_for_probe =
- (_has_null_in_build_side && p._join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
+ (_shared_state->_has_null_in_build_side &&
+ p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!p._is_mark_join) ||
(_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::INNER_JOIN &&
!p._is_mark_join) ||
(_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::LEFT_SEMI_JOIN &&
@@ -203,7 +204,7 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
has_null_value ||
short_circuit_for_null_in_build_side
? &null_map_val->get_data()
: nullptr,
- &_has_null_in_build_side);
+
&_shared_state->_has_null_in_build_side);
}},
*_shared_state->hash_table_variants,
vectorized::make_bool_variant(_build_side_ignore_null),
@@ -452,7 +453,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
// make one block for each 4 gigabytes
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
- if (local_state._has_null_in_build_side) {
+ if (local_state._shared_state->_has_null_in_build_side) {
// TODO: if _has_null_in_build_side is true we should finish current
pipeline task.
DCHECK(state->enable_pipeline_exec());
return Status::OK();
@@ -538,7 +539,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
_shared_hash_table_context->hash_table_variants =
local_state._shared_state->hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
- local_state._has_null_in_build_side;
+ local_state._shared_state->_has_null_in_build_side;
if (local_state._runtime_filter_slots) {
local_state._runtime_filter_slots->copy_to_shared_context(
_shared_hash_table_context);
@@ -556,7 +557,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
local_state.profile()->add_info_string(
"SharedHashTableFrom",
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
- local_state._has_null_in_build_side =
+ local_state._shared_state->_has_null_in_build_side =
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
local_state._shared_state->hash_table_variants =
std::static_pointer_cast<vectorized::HashTableVariants>(
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index 0a4b528be38..a4a66507085 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -19,6 +19,7 @@
#include <string>
+#include "common/logging.h"
#include "pipeline/exec/operator.h"
namespace doris {
@@ -184,9 +185,15 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
local_state.init_for_probe(state);
SCOPED_TIMER(local_state._probe_timer);
if (local_state._shared_state->short_circuit_for_probe) {
- /// If `_short_circuit_for_probe` is true, this indicates no rows
- /// match the join condition, and this is 'mark join', so we need to
create a column as mark
- /// with all rows set to 0.
+ // If we use a short-circuit strategy, should return empty block
directly.
+ source_state = SourceState::FINISHED;
+ return Status::OK();
+ }
+ if (local_state._shared_state->_has_null_in_build_side &&
+ _short_circuit_for_null_in_build_side) {
+ /// `_has_null_in_build_side` means have null value in build side.
+ /// `_short_circuit_for_null_in_build_side` means short circuit if has
null in build side(e.g. null aware left anti join).
+ /// We need to create a column as mark with all rows set to NULL.
if (_is_mark_join) {
auto block_rows = local_state._probe_block.rows();
if (block_rows == 0) {
@@ -203,9 +210,11 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
temp_block.insert(local_state._probe_block.get_by_position(i));
}
}
- auto mark_column = vectorized::ColumnUInt8::create(block_rows, 0);
- temp_block.insert(
- {std::move(mark_column),
std::make_shared<vectorized::DataTypeUInt8>(), ""});
+ auto mark_column = vectorized::ColumnNullable::create(
+ vectorized::ColumnUInt8::create(block_rows, 0),
+ vectorized::ColumnUInt8::create(block_rows, 1));
+ temp_block.insert({std::move(mark_column),
+
make_nullable(std::make_shared<vectorized::DataTypeUInt8>()), ""});
{
SCOPED_TIMER(local_state._join_filter_timer);
@@ -220,9 +229,6 @@ Status HashJoinProbeOperatorX::pull(doris::RuntimeState*
state, vectorized::Bloc
local_state.reached_limit(output_block, source_state);
return Status::OK();
}
- // If we use a short-circuit strategy, should return empty block
directly.
- source_state = SourceState::FINISHED;
- return Status::OK();
}
local_state._join_block.clear_column_data();
diff --git a/be/src/pipeline/exec/join_build_sink_operator.h
b/be/src/pipeline/exec/join_build_sink_operator.h
index 2f7a3ec03ea..5f440836874 100644
--- a/be/src/pipeline/exec/join_build_sink_operator.h
+++ b/be/src/pipeline/exec/join_build_sink_operator.h
@@ -40,8 +40,6 @@ protected:
template <typename LocalStateType>
friend class JoinBuildSinkOperatorX;
- bool _has_null_in_build_side = false;
-
RuntimeProfile::Counter* _build_rows_counter;
RuntimeProfile::Counter* _push_down_timer;
RuntimeProfile::Counter* _push_compute_timer;
diff --git a/be/src/pipeline/exec/join_probe_operator.cpp
b/be/src/pipeline/exec/join_probe_operator.cpp
index c4776afe019..63074bed70c 100644
--- a/be/src/pipeline/exec/join_probe_operator.cpp
+++ b/be/src/pipeline/exec/join_probe_operator.cpp
@@ -66,9 +66,8 @@ void JoinProbeLocalState<DependencyType,
Derived>::_construct_mutable_join_block
}
}
if (p._is_mark_join) {
- _join_block.replace_by_position(
- _join_block.columns() - 1,
-
remove_nullable(_join_block.get_by_position(_join_block.columns() - 1).column));
+ DCHECK(!p._is_mark_join ||
+ _join_block.get_by_position(_join_block.columns() -
1).column->is_nullable());
}
}
@@ -183,7 +182,8 @@
JoinProbeOperatorX<LocalStateType>::JoinProbeOperatorX(ObjectPool* pool, const T
?
tnode.nested_loop_join_node.is_mark
: false)
: tnode.hash_join_node.__isset.is_mark ?
tnode.hash_join_node.is_mark
- : false) {
+ : false),
+ _short_circuit_for_null_in_build_side(_join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (tnode.__isset.hash_join_node) {
_intermediate_row_desc.reset(new RowDescriptor(
descs, tnode.hash_join_node.vintermediate_tuple_id_list,
diff --git a/be/src/pipeline/exec/join_probe_operator.h
b/be/src/pipeline/exec/join_probe_operator.h
index 863160b83f6..5727318a4f4 100644
--- a/be/src/pipeline/exec/join_probe_operator.h
+++ b/be/src/pipeline/exec/join_probe_operator.h
@@ -113,6 +113,7 @@ protected:
// output expr
vectorized::VExprContextSPtrs _output_expr_ctxs;
OperatorXPtr _build_side_child;
+ const bool _short_circuit_for_null_in_build_side;
};
} // namespace pipeline
diff --git a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
index ecac7c94dd1..14e19dd352f 100644
--- a/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
+++ b/be/src/pipeline/exec/nested_loop_join_probe_operator.cpp
@@ -22,6 +22,7 @@
#include "pipeline/exec/operator.h"
#include "vec/core/block.h"
#include "vec/exec/join/vnested_loop_join_node.h"
+#include "vec/columns/column_filter_helper.h"
namespace doris {
class RuntimeState;
@@ -102,12 +103,9 @@ void
NestedLoopJoinProbeLocalState::_update_additional_flags(vectorized::Block*
}
}
if (p._is_mark_join) {
- vectorized::IColumn::Filter& mark_data =
-
assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>(
- *block->get_by_position(block->columns() -
1).column->assume_mutable())
- .get_data();
- if (mark_data.size() < block->rows()) {
- mark_data.resize_fill(block->rows(), 1);
+ auto mark_column = block->get_by_position(block->columns() -
1).column->assume_mutable();
+ if (mark_column->size() < block->rows()) {
+
vectorized::ColumnFilterHelper(*mark_column).resize_fill(block->rows(), 1);
}
}
}
@@ -343,15 +341,12 @@ void
NestedLoopJoinProbeLocalState::_finalize_current_phase(vectorized::MutableB
_resize_fill_tuple_is_null_column(new_size, 0, 1);
}
} else {
- vectorized::IColumn::Filter& mark_data =
-
assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>(
- *dst_columns[dst_columns.size() - 1])
- .get_data();
- mark_data.reserve(mark_data.size() + _left_side_process_count);
+ vectorized::ColumnFilterHelper
mark_column(*dst_columns[dst_columns.size() - 1]);
+ mark_column.reserve(mark_column.size() + _left_side_process_count);
DCHECK_LE(_left_block_start_pos + _left_side_process_count,
_child_block->rows());
for (int j = _left_block_start_pos;
j < _left_block_start_pos + _left_side_process_count; ++j) {
- mark_data.emplace_back(IsSemi ==
_cur_probe_row_visited_flags[j]);
+ mark_column.insert_value(IsSemi ==
_cur_probe_row_visited_flags[j]);
}
for (size_t i = 0; i < p._num_probe_side_columns; ++i) {
const vectorized::ColumnWithTypeAndName src_column =
@@ -396,11 +391,9 @@ void
NestedLoopJoinProbeLocalState::_append_left_data_with_null(
for (size_t i = 0; i < p._num_build_side_columns; ++i) {
dst_columns[p._num_probe_side_columns +
i]->insert_many_defaults(_left_side_process_count);
}
- vectorized::IColumn::Filter& mark_data =
- assert_cast<doris::vectorized::ColumnVector<vectorized::UInt8>&>(
- *dst_columns[dst_columns.size() - 1])
- .get_data();
- mark_data.resize_fill(mark_data.size() + _left_side_process_count, 0);
+ auto& mark_column = *dst_columns[dst_columns.size() - 1];
+ vectorized::ColumnFilterHelper(mark_column)
+ .resize_fill(mark_column.size() + _left_side_process_count, 0);
}
void NestedLoopJoinProbeLocalState::_process_left_child_block(
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 5c9ebb452e1..11e6a0b5c93 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -548,6 +548,7 @@ struct JoinSharedState {
// For some join case, we can apply a short circuit strategy
// 1. _has_null_in_build_side = true
// 2. build side rows is empty, Join op is: inner join/right outer
join/left semi/right semi/right anti
+ bool _has_null_in_build_side = false;
bool short_circuit_for_probe = false;
vectorized::JoinOpVariants join_op_variants;
};
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 5f769e4cafe..3ec7d364d56 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -159,7 +159,8 @@
HashJoinProbeContext::HashJoinProbeContext(pipeline::HashJoinProbeLocalState* lo
_probe_key_sz(local_state->_shared_state->probe_key_sz),
_left_output_slot_flags(&local_state->join_probe()->_left_output_slot_flags),
_right_output_slot_flags(&local_state->join_probe()->_right_output_slot_flags),
-
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output) {}
+
_is_any_probe_match_row_output(&local_state->_is_any_probe_match_row_output),
+
_has_null_value_in_build_side(local_state->_shared_state->_has_null_in_build_side)
{}
HashJoinBuildContext::HashJoinBuildContext(HashJoinNode* join_node)
: _hash_table_memory_usage(join_node->_hash_table_memory_usage),
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]