This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 5c020be4d24 [Bug](join) corner case cause the mark join + null aware
left join core dump in regression test in pipeline query engine (#25087)
5c020be4d24 is described below
commit 5c020be4d247c6c762cff53e2c1f9d28efa68b20
Author: HappenLee <[email protected]>
AuthorDate: Sun Oct 8 22:50:12 2023 +0800
[Bug](join) corner case cause the mark join + null aware left join core
dump in regression test in pipeline query engine (#25087)
---
be/src/pipeline/exec/operator.h | 7 +---
be/src/pipeline/pipeline_task.cpp | 7 +---
be/src/vec/exec/join/vhash_join_node.cpp | 70 +++++++++++++++-----------------
be/src/vec/exec/join/vhash_join_node.h | 4 +-
be/src/vec/exec/join/vjoin_node_base.h | 4 +-
be/src/vec/exec/scan/vscan_node.cpp | 3 --
6 files changed, 39 insertions(+), 56 deletions(-)
diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h
index 73e2d5d41b8..4ba2aec977f 100644
--- a/be/src/pipeline/exec/operator.h
+++ b/be/src/pipeline/exec/operator.h
@@ -450,12 +450,7 @@ public:
if (node->need_more_input_data()) {
_child_block->clear_column_data();
- Status status = child->get_block(state, _child_block.get(),
_child_source_state);
- if (status.is<777>()) {
- LOG(INFO) << "Scan block nullptr error _source_state:" <<
int(source_state)
- << " query id:" << print_id(state->query_id());
- }
- RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(child->get_block(state, _child_block.get(),
_child_source_state));
source_state = _child_source_state;
if (_child_block->rows() == 0 && _child_source_state !=
SourceState::FINISHED) {
return Status::OK();
diff --git a/be/src/pipeline/pipeline_task.cpp
b/be/src/pipeline/pipeline_task.cpp
index 8061253d2ec..a0f77578e73 100644
--- a/be/src/pipeline/pipeline_task.cpp
+++ b/be/src/pipeline/pipeline_task.cpp
@@ -289,12 +289,7 @@ Status PipelineTask::execute(bool* eos) {
{
SCOPED_TIMER(_get_block_timer);
_get_block_counter->update(1);
- auto status = _root->get_block(_state, block, _data_state);
- if (status.is<777>()) {
- LOG(FATAL) << "Scan block nullptr error: can read:" <<
source_can_read()
- << " query id:" << print_id(_state->query_id());
- }
- RETURN_IF_ERROR(status);
+ RETURN_IF_ERROR(_root->get_block(_state, block, _data_state));
}
*eos = _data_state == SourceState::FINISHED;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 75ea6b06ba8..aa91846cc8b 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -418,7 +418,7 @@ Status HashJoinNode::close(RuntimeState* state) {
bool HashJoinNode::need_more_input_data() const {
return (_probe_block.rows() == 0 || _probe_index == _probe_block.rows())
&& !_probe_eos &&
- (!_short_circuit_for_probe || _is_mark_join);
+ !_short_circuit_for_probe;
}
void HashJoinNode::prepare_for_next() {
@@ -430,45 +430,46 @@ void HashJoinNode::prepare_for_next() {
Status HashJoinNode::pull(doris::RuntimeState* state, vectorized::Block*
output_block, bool* eos) {
SCOPED_TIMER(_probe_timer);
if (_short_circuit_for_probe) {
- /// If `_short_circuit_for_probe` is true, this indicates no rows
+ // If we use a short-circuit strategy, should return empty block
directly.
+ *eos = true;
+ return Status::OK();
+ }
+
+ if (_short_circuit_for_null_in_probe_side && _is_mark_join) {
+ /// If `_short_circuit_for_null_in_probe_side` is true, this indicates
no rows
/// match the join condition, and this is 'mark join', so we need to
create a column as mark
/// with all rows set to 0.
- if (_is_mark_join) {
- auto block_rows = _probe_block.rows();
- if (block_rows == 0) {
- *eos = _probe_eos;
- return Status::OK();
- }
-
- Block temp_block;
- //get probe side output column
- for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
- if (_left_output_slot_flags[i]) {
- temp_block.insert(_probe_block.get_by_position(i));
- }
- }
- auto mark_column = ColumnUInt8::create(block_rows, 0);
- temp_block.insert({std::move(mark_column),
std::make_shared<DataTypeUInt8>(), ""});
+ auto block_rows = _probe_block.rows();
+ if (block_rows == 0) {
+ *eos = _probe_eos;
+ return Status::OK();
+ }
- {
- SCOPED_TIMER(_join_filter_timer);
- RETURN_IF_ERROR(
- VExprContext::filter_block(_conjuncts, &temp_block,
temp_block.columns()));
+ Block temp_block;
+ //get probe side output column
+ for (int i = 0; i < _left_output_slot_flags.size(); ++i) {
+ if (_left_output_slot_flags[i]) {
+ temp_block.insert(_probe_block.get_by_position(i));
}
+ }
+ auto mark_column = ColumnUInt8::create(block_rows, 0);
+ temp_block.insert({std::move(mark_column),
std::make_shared<DataTypeUInt8>(), ""});
- RETURN_IF_ERROR(_build_output_block(&temp_block, output_block,
false));
- temp_block.clear();
- release_block_memory(_probe_block);
- reached_limit(output_block, eos);
- return Status::OK();
+ {
+ SCOPED_TIMER(_join_filter_timer);
+ RETURN_IF_ERROR(
+ VExprContext::filter_block(_conjuncts, &temp_block,
temp_block.columns()));
}
- // If we use a short-circuit strategy, should return empty block
directly.
- *eos = true;
+
+ RETURN_IF_ERROR(_build_output_block(&temp_block, output_block, false));
+ temp_block.clear();
+ release_block_memory(_probe_block);
+ reached_limit(output_block, eos);
return Status::OK();
}
//TODO: this short circuit maybe could refactor, no need to check at here.
- if (_short_circuit_for_probe_and_additional_data) {
+ if (_empty_right_table_need_probe_dispose) {
// when build table rows is 0 and not have other_join_conjunct and
join type is one of LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
// we could get the result is probe table + null-column(if need output)
// If we use a short-circuit strategy, should return block directly by
add additional null data.
@@ -641,13 +642,8 @@ Status HashJoinNode::push(RuntimeState* /*state*/,
vectorized::Block* input_bloc
Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool*
eos) {
SCOPED_TIMER(_runtime_profile->total_time_counter());
- if (_is_hash_join_early_start_probe_eos(state)) {
- *eos = true;
- return Status::OK();
- }
-
- if (_short_circuit_for_probe && !_is_mark_join) {
- // If we use a short-circuit strategy, should return empty block
directly.
+ // If we use a short-circuit strategy, should return empty block directly.
+ if (_is_hash_join_early_start_probe_eos(state) ||
_short_circuit_for_probe) {
*eos = true;
return Status::OK();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index c60f1a0c7ae..c75ab58357c 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -577,7 +577,7 @@ private:
void _init_short_circuit_for_probe() override {
_short_circuit_for_probe =
(_short_circuit_for_null_in_probe_side &&
- _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) ||
+ _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN &&
!_is_mark_join) ||
(_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN
&& !_is_mark_join) ||
(_build_blocks->empty() && _join_op ==
TJoinOp::RIGHT_OUTER_JOIN) ||
@@ -586,7 +586,7 @@ private:
//when build table rows is 0 and not have other_join_conjunct and not
_is_mark_join and join type is one of
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
- _short_circuit_for_probe_and_additional_data =
+ _empty_right_table_need_probe_dispose =
(_build_blocks->empty() && !_have_other_join_conjunct &&
!_is_mark_join) &&
(_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::LEFT_ANTI_JOIN);
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index 234374e3c0e..0de7ae11064 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -99,7 +99,7 @@ protected:
virtual void _init_short_circuit_for_probe() {
_short_circuit_for_probe = false;
- _short_circuit_for_probe_and_additional_data = false;
+ _empty_right_table_need_probe_dispose = false;
}
TJoinOp::type _join_op;
@@ -128,7 +128,7 @@ protected:
bool _short_circuit_for_probe = false;
// for some join, when build side rows is empty, we could return directly
by add some additional null data in probe table.
- bool _short_circuit_for_probe_and_additional_data = false;
+ bool _empty_right_table_need_probe_dispose = false;
std::unique_ptr<RowDescriptor> _output_row_desc;
std::unique_ptr<RowDescriptor> _intermediate_row_desc;
// output expr
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index c9c4cc7e1ee..0e6b8a54db8 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -264,9 +264,6 @@ Status VScanNode::get_next(RuntimeState* state,
vectorized::Block* block, bool*
return Status::OK();
}
- if (scan_block == nullptr) {
- return Status::Error<777>("not pointer in scan pipline");
- }
// get scanner's block memory
block->swap(*scan_block);
_scanner_ctx->return_free_block(std::move(scan_block));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]