This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7513c82431 [NLJoin](conjuncts) separate join conjuncts and general
conjuncts (#14608)
7513c82431 is described below
commit 7513c82431b4aa2c6d8ab45ddfb6874e3c4fa7fd
Author: Gabriel <[email protected]>
AuthorDate: Tue Nov 29 08:55:54 2022 +0800
[NLJoin](conjuncts) separate join conjuncts and general conjuncts (#14608)
---
be/src/util/runtime_profile.h | 2 +-
be/src/vec/exec/join/vhash_join_node.cpp | 65 +++++++---------
be/src/vec/exec/join/vhash_join_node.h | 15 +---
be/src/vec/exec/join/vjoin_node_base.cpp | 12 +++
be/src/vec/exec/join/vjoin_node_base.h | 9 +++
be/src/vec/exec/join/vnested_loop_join_node.cpp | 87 ++++++++++++++++++++--
be/src/vec/exec/join/vnested_loop_join_node.h | 6 ++
.../doris/analysis/BitmapFilterPredicate.java | 2 +-
.../glue/translator/PhysicalPlanTranslator.java | 5 +-
.../org/apache/doris/planner/HashJoinNode.java | 34 ++-------
.../org/apache/doris/planner/JoinNodeBase.java | 32 +++++++-
.../apache/doris/planner/NestedLoopJoinNode.java | 64 +++++++++-------
.../apache/doris/planner/SingleNodePlanner.java | 7 +-
gensrc/thrift/PlanNodes.thrift | 2 +
.../query_p0/join/test_nestedloop_outer_join.out | 12 +++
.../join/test_nestedloop_outer_join.groovy | 16 ++++
16 files changed, 249 insertions(+), 121 deletions(-)
diff --git a/be/src/util/runtime_profile.h b/be/src/util/runtime_profile.h
index 55b4b2c3d2..62f0365afc 100644
--- a/be/src/util/runtime_profile.h
+++ b/be/src/util/runtime_profile.h
@@ -539,7 +539,7 @@ public:
if (counter == nullptr) {
return;
}
- DCHECK(counter->type() == TUnit::TIME_NS);
+ DCHECK_EQ(counter->type(), TUnit::TIME_NS);
_sw.start();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 2cce0fdff5..35b83e2a8d 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -343,11 +343,6 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
init_output_slots_flags(child(0)->row_desc().tuple_descriptors(),
_left_output_slot_flags);
init_output_slots_flags(child(1)->row_desc().tuple_descriptors(),
_right_output_slot_flags);
- // only use in outer join as the bool column to mark for function of
`tuple_is_null`
- if (_is_outer_join) {
- _tuple_is_null_left_flag_column = ColumnUInt8::create();
- _tuple_is_null_right_flag_column = ColumnUInt8::create();
- }
return Status::OK();
}
@@ -565,7 +560,9 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
return Status::OK();
}
- _add_tuple_is_null_column(&temp_block);
+ if (_is_outer_join) {
+ _add_tuple_is_null_column(&temp_block);
+ }
{
SCOPED_TIMER(_join_filter_timer);
RETURN_IF_ERROR(
@@ -578,6 +575,30 @@ Status HashJoinNode::get_next(RuntimeState* state, Block*
output_block, bool* eo
return st;
}
+void HashJoinNode::_add_tuple_is_null_column(Block* block) {
+ DCHECK(_is_outer_join);
+ auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
+ auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
+ auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
+ auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1);
+ auto left_size = left_null_map.size();
+ auto right_size = right_null_map.size();
+
+ if (left_size == 0) {
+ DCHECK_EQ(right_size, block->rows());
+ left_null_map.get_data().resize_fill(right_size, 0);
+ }
+ if (right_size == 0) {
+ DCHECK_EQ(left_size, block->rows());
+ right_null_map.get_data().resize_fill(left_size, 0);
+ }
+
+ block->insert(
+ {std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(),
"left_tuples_is_null"});
+ block->insert(
+ {std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(),
"right_tuples_is_null"});
+}
+
void HashJoinNode::_prepare_probe_block() {
// clear_column_data of _probe_block
if (!_probe_column_disguise_null.empty()) {
@@ -1051,38 +1072,6 @@ std::vector<uint16_t>
HashJoinNode::_convert_block_to_null(Block& block) {
return results;
}
-void HashJoinNode::_add_tuple_is_null_column(Block* block) {
- if (_is_outer_join) {
- auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
- auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
- auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
- auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1);
- auto left_size = left_null_map.size();
- auto right_size = right_null_map.size();
-
- if (left_size == 0) {
- DCHECK_EQ(right_size, block->rows());
- left_null_map.get_data().resize_fill(right_size, 0);
- }
- if (right_size == 0) {
- DCHECK_EQ(left_size, block->rows());
- right_null_map.get_data().resize_fill(left_size, 0);
- }
-
- block->insert({std::move(p0),
std::make_shared<vectorized::DataTypeUInt8>(),
- "left_tuples_is_null"});
- block->insert({std::move(p1),
std::make_shared<vectorized::DataTypeUInt8>(),
- "right_tuples_is_null"});
- }
-}
-
-void HashJoinNode::_reset_tuple_is_null_column() {
- if (_is_outer_join) {
-
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear();
-
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear();
- }
-}
-
HashJoinNode::~HashJoinNode() {
if (_shared_hashtable_controller && _should_build_hash_table) {
_shared_hashtable_controller->signal(id());
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 1a35bb2652..a41d3c2f7b 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -251,8 +251,6 @@ private:
RuntimeProfile::Counter* _build_side_compute_hash_timer;
RuntimeProfile::Counter* _build_side_merge_block_timer;
- RuntimeProfile::Counter* _join_filter_timer;
-
RuntimeProfile* _build_phase_profile;
int64_t _mem_used;
@@ -289,12 +287,8 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
- MutableColumnPtr _tuple_is_null_left_flag_column;
- MutableColumnPtr _tuple_is_null_right_flag_column;
-
SharedHashTableContextPtr _shared_hash_table_context = nullptr;
-private:
Status _materialize_build_side(RuntimeState* state) override;
Status _process_build_block(RuntimeState* state, Block& block, uint8_t
offset);
@@ -317,16 +311,13 @@ private:
void _prepare_probe_block();
- // add tuple is null flag column to Block for filter conjunct and output
expr
- void _add_tuple_is_null_column(Block* block);
-
- // reset the tuple is null flag column for the next call
- void _reset_tuple_is_null_column();
-
static std::vector<uint16_t> _convert_block_to_null(Block& block);
void _release_mem();
+ // add tuple is null flag column to Block for filter conjunct and output
expr
+ void _add_tuple_is_null_column(Block* block) override;
+
template <class HashTableContext>
friend struct ProcessHashTableBuild;
diff --git a/be/src/vec/exec/join/vjoin_node_base.cpp
b/be/src/vec/exec/join/vjoin_node_base.cpp
index 731b383340..d5d5280f57 100644
--- a/be/src/vec/exec/join/vjoin_node_base.cpp
+++ b/be/src/vec/exec/join/vjoin_node_base.cpp
@@ -144,6 +144,11 @@ Status VJoinNodeBase::init(const TPlanNode& tnode,
RuntimeState* state) {
_output_expr_ctxs.push_back(ctx);
}
}
+ // only use in outer join as the bool column to mark for function of
`tuple_is_null`
+ if (_is_outer_join) {
+ _tuple_is_null_left_flag_column = ColumnUInt8::create();
+ _tuple_is_null_right_flag_column = ColumnUInt8::create();
+ }
return ExecNode::init(tnode, state);
}
@@ -173,6 +178,13 @@ Status VJoinNodeBase::open(RuntimeState* state) {
return status;
}
+void VJoinNodeBase::_reset_tuple_is_null_column() {
+ if (_is_outer_join) {
+
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_left_flag_column).clear();
+
reinterpret_cast<ColumnUInt8&>(*_tuple_is_null_right_flag_column).clear();
+ }
+}
+
void VJoinNodeBase::_probe_side_open_thread(RuntimeState* state,
std::promise<Status>* status) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VJoinNodeBase::_hash_table_build_thread");
SCOPED_ATTACH_TASK(state);
diff --git a/be/src/vec/exec/join/vjoin_node_base.h
b/be/src/vec/exec/join/vjoin_node_base.h
index 81869c373d..f2bdc6eced 100644
--- a/be/src/vec/exec/join/vjoin_node_base.h
+++ b/be/src/vec/exec/join/vjoin_node_base.h
@@ -68,6 +68,11 @@ protected:
// Initialize the join operation.
void _init_join_op();
+ virtual void _add_tuple_is_null_column(Block* block) = 0;
+
+ // reset the tuple is null flag column for the next call
+ void _reset_tuple_is_null_column();
+
// Materialize build relation. For HashJoin, it will build a hash table
while a list of build blocks for NLJoin.
virtual Status _materialize_build_side(RuntimeState* state) = 0;
@@ -97,12 +102,16 @@ protected:
Block _join_block;
+ MutableColumnPtr _tuple_is_null_left_flag_column;
+ MutableColumnPtr _tuple_is_null_right_flag_column;
+
RuntimeProfile::Counter* _build_timer;
RuntimeProfile::Counter* _probe_timer;
RuntimeProfile::Counter* _build_rows_counter;
RuntimeProfile::Counter* _probe_rows_counter;
RuntimeProfile::Counter* _push_down_timer;
RuntimeProfile::Counter* _push_compute_timer;
+ RuntimeProfile::Counter* _join_filter_timer;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.cpp
b/be/src/vec/exec/join/vnested_loop_join_node.cpp
index 356a9a7a5a..246122d5df 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.cpp
+++ b/be/src/vec/exec/join/vnested_loop_join_node.cpp
@@ -83,6 +83,12 @@ Status VNestedLoopJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
_is_output_left_side_only =
tnode.nested_loop_join_node.is_output_left_side_only;
}
+ if (tnode.nested_loop_join_node.__isset.vjoin_conjunct) {
+ _vjoin_conjunct_ptr.reset(new VExprContext*);
+ RETURN_IF_ERROR(VExpr::create_expr_tree(_pool,
tnode.nested_loop_join_node.vjoin_conjunct,
+ _vjoin_conjunct_ptr.get()));
+ }
+
std::vector<TExpr> filter_src_exprs;
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
filter_src_exprs.push_back(_runtime_filter_descs[i].src_expr);
@@ -106,6 +112,7 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
_probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
_push_down_timer = ADD_TIMER(runtime_profile(), "PushDownTime");
_push_compute_timer = ADD_TIMER(runtime_profile(), "PushDownComputeTime");
+ _join_filter_timer = ADD_TIMER(runtime_profile(), "JoinFilterTimer");
// pre-compute the tuple index of build tuples in the output row
int num_build_tuples = child(1)->row_desc().tuple_descriptors().size();
@@ -116,6 +123,9 @@ Status VNestedLoopJoinNode::prepare(RuntimeState* state) {
RETURN_IF_INVALID_TUPLE_IDX(build_tuple_desc->id(), tuple_idx);
}
+ if (_vjoin_conjunct_ptr) {
+ RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->prepare(state,
*_intermediate_row_desc));
+ }
_num_probe_side_columns = child(0)->row_desc().num_materialized_slots();
_num_build_side_columns = child(1)->row_desc().num_materialized_slots();
RETURN_IF_ERROR(VExpr::prepare(_output_expr_ctxs, state,
*_intermediate_row_desc));
@@ -132,6 +142,7 @@ Status VNestedLoopJoinNode::close(RuntimeState* state) {
}
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VNestedLoopJoinNode::close");
VExpr::close(_filter_src_expr_ctxs, state);
+ if (_vjoin_conjunct_ptr) (*_vjoin_conjunct_ptr)->close(state);
_release_mem();
return VJoinNodeBase::close(state);
@@ -191,9 +202,9 @@ Status VNestedLoopJoinNode::get_left_side(RuntimeState*
state, Block* block) {
Status VNestedLoopJoinNode::get_next(RuntimeState* state, Block* block, bool*
eos) {
INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span,
"VNestedLoopJoinNode::get_next");
+ SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(_probe_timer);
RETURN_IF_CANCELLED(state);
- SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
if (_is_output_left_side_only) {
@@ -249,6 +260,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
Status status =
_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag>(
&tmp_block, offset_stack, !_is_left_semi_anti);
+ _update_tuple_is_null_column(&tmp_block);
if (!status.OK()) {
return status;
}
@@ -277,6 +289,7 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
Status status =
_do_filtering_and_update_visited_flags<set_build_side_flag,
set_probe_side_flag>(
&tmp_block, offset_stack, !_is_right_semi_anti);
+ _update_tuple_is_null_column(&tmp_block);
mutable_join_block = MutableBlock(std::move(tmp_block));
if (!status.OK()) {
return status;
@@ -300,7 +313,16 @@ Status VNestedLoopJoinNode::get_next(RuntimeState* state,
Block* block, bool* eo
: _matched_rows_done;
Block tmp_block = mutable_join_block.to_block(0);
+ if (_is_outer_join) {
+ _add_tuple_is_null_column(&tmp_block);
+ }
+ {
+ SCOPED_TIMER(_join_filter_timer);
+ RETURN_IF_ERROR(
+ VExprContext::filter_block(_vconjunct_ctx_ptr, &tmp_block,
tmp_block.columns()));
+ }
RETURN_IF_ERROR(_build_output_block(&tmp_block, block));
+ _reset_tuple_is_null_column();
reached_limit(block, eos);
return Status::OK();
}
@@ -344,8 +366,38 @@ void
VNestedLoopJoinNode::_process_left_child_block(MutableColumns& dst_columns,
}
}
+void VNestedLoopJoinNode::_update_tuple_is_null_column(Block* block) {
+ if (_is_outer_join) {
+ auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
+ auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
+ auto& left_null_map = reinterpret_cast<ColumnUInt8&>(*p0);
+ auto& right_null_map = reinterpret_cast<ColumnUInt8&>(*p1);
+ auto left_size = left_null_map.size();
+ auto right_size = right_null_map.size();
+
+ if (left_size < block->rows()) {
+ left_null_map.get_data().resize_fill(block->rows(), 0);
+ }
+ if (right_size < block->rows()) {
+ right_null_map.get_data().resize_fill(block->rows(), 0);
+ }
+ }
+}
+
+void VNestedLoopJoinNode::_add_tuple_is_null_column(Block* block) {
+ DCHECK(_is_outer_join);
+ auto p0 = _tuple_is_null_left_flag_column->assume_mutable();
+ auto p1 = _tuple_is_null_right_flag_column->assume_mutable();
+ block->insert(
+ {std::move(p0), std::make_shared<vectorized::DataTypeUInt8>(),
"left_tuples_is_null"});
+ block->insert(
+ {std::move(p1), std::make_shared<vectorized::DataTypeUInt8>(),
"right_tuples_is_null"});
+}
+
template <bool BuildSide, bool IsSemi>
void VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns,
size_t batch_size) {
+ DCHECK_GT(dst_columns.size(), 0);
+ auto pre_size = dst_columns[0]->size();
if constexpr (BuildSide) {
auto build_block_sz = _build_blocks.size();
size_t i = _output_null_idx_build_side;
@@ -378,6 +430,15 @@ void
VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
dst_columns[j]->insert_many_defaults(selector_idx);
}
+ if (_is_outer_join) {
+
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_left_flag_column.get())
+ ->get_data()
+ .resize_fill(pre_size + selector_idx, 1);
+
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_right_flag_column.get())
+ ->get_data()
+ .resize_fill(pre_size + selector_idx, 0);
+ }
+
for (size_t j = 0; j < _num_build_side_columns; ++j) {
auto src_column = cur_block.get_by_position(j);
if (!src_column.column->is_nullable() &&
@@ -403,6 +464,7 @@ void
VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
i++;
break;
}
+ pre_size = dst_columns[0]->size();
}
_output_null_idx_build_side = i;
} else {
@@ -437,6 +499,14 @@ void
VNestedLoopJoinNode::_finalize_current_phase(MutableColumns& dst_columns, s
for (size_t i = 0; i < _num_build_side_columns; ++i) {
dst_columns[_num_probe_side_columns + i]->insert_default();
}
+ if (_is_outer_join) {
+
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_left_flag_column.get())
+ ->get_data()
+ .resize_fill(pre_size + 1, 0);
+
reinterpret_cast<ColumnUInt8*>(_tuple_is_null_right_flag_column.get())
+ ->get_data()
+ .resize_fill(pre_size + 1, 1);
+ }
}
}
@@ -457,10 +527,10 @@ Status
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
size_t build_block_idx =
_current_build_pos == 0 ? _build_blocks.size() - 1 :
_current_build_pos - 1;
size_t processed_blocks_num = offset_stack.size();
- if (LIKELY(_vconjunct_ctx_ptr != nullptr && block->rows() > 0)) {
- DCHECK((*_vconjunct_ctx_ptr) != nullptr);
+ if (LIKELY(_vjoin_conjunct_ptr != nullptr && block->rows() > 0)) {
+ DCHECK((*_vjoin_conjunct_ptr) != nullptr);
int result_column_id = -1;
- RETURN_IF_ERROR((*_vconjunct_ctx_ptr)->execute(block,
&result_column_id));
+ RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->execute(block,
&result_column_id));
ColumnPtr filter_column =
block->get_by_position(result_column_id).column;
if (auto* nullable_column =
check_and_get_column<ColumnNullable>(*filter_column)) {
ColumnPtr nested_column = nullable_column->get_nested_column_ptr();
@@ -512,7 +582,7 @@ Status
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
bool ret = const_column->get_bool(0);
if (!ret) {
for (size_t i = 0; i < column_to_keep; ++i) {
-
std::move(*block->get_by_position(i).column).assume_mutable()->clear();
+
block->get_by_position(i).column->assume_mutable()->clear();
}
} else {
if constexpr (SetBuildSideFlag) {
@@ -569,6 +639,7 @@ Status
VNestedLoopJoinNode::_do_filtering_and_update_visited_flags(
}
}
#undef CLEAR_BLOCK
+ Block::erase_useless_column(block, column_to_keep);
return Status::OK();
}
@@ -576,6 +647,9 @@ Status VNestedLoopJoinNode::open(RuntimeState* state) {
START_AND_SCOPE_SPAN(state->get_tracer(), span,
"VNestedLoopJoinNode::open")
SCOPED_TIMER(_runtime_profile->total_time_counter());
RETURN_IF_ERROR(VExpr::open(_filter_src_expr_ctxs, state));
+ if (_vjoin_conjunct_ptr) {
+ RETURN_IF_ERROR((*_vjoin_conjunct_ptr)->open(state));
+ }
RETURN_IF_ERROR(VJoinNodeBase::open(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
RETURN_IF_CANCELLED(state);
@@ -602,6 +676,9 @@ void VNestedLoopJoinNode::_release_mem() {
MutableColumns tmp_build_side_visited_flags;
_build_side_visited_flags.swap(tmp_build_side_visited_flags);
+
+ _tuple_is_null_left_flag_column = nullptr;
+ _tuple_is_null_right_flag_column = nullptr;
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vnested_loop_join_node.h
b/be/src/vec/exec/join/vnested_loop_join_node.h
index 7f47b7959e..45644f9e8e 100644
--- a/be/src/vec/exec/join/vnested_loop_join_node.h
+++ b/be/src/vec/exec/join/vnested_loop_join_node.h
@@ -83,6 +83,11 @@ private:
Status get_left_side(RuntimeState* state, Block* block);
+ // add tuple is null flag column to Block for filter conjunct and output
expr
+ void _update_tuple_is_null_column(Block* block);
+
+ void _add_tuple_is_null_column(Block* block) override;
+
// List of build blocks, constructed in prepare()
Blocks _build_blocks;
// Visited flags for each row in build side.
@@ -115,6 +120,7 @@ private:
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
std::vector<vectorized::VExprContext*> _filter_src_expr_ctxs;
bool _is_output_left_side_only = false;
+ std::unique_ptr<VExprContext*> _vjoin_conjunct_ptr;
friend struct RuntimeFilterBuild;
};
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
index d7e0463016..1802f0917b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/analysis/BitmapFilterPredicate.java
@@ -90,7 +90,7 @@ public class BitmapFilterPredicate extends Predicate {
@Override
protected void toThrift(TExprNode msg) {
- // Unreachable
+ Preconditions.checkArgument(false, "`toThrift` in
BitmapFilterPredicate should not be reached!");
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index ea368bfedf..235f0265b6 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -896,8 +896,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
nestedLoopJoinNode.setChild(0, leftFragment.getPlanRoot());
connectChildFragment(nestedLoopJoinNode, 1, leftFragment,
rightFragment, context);
leftFragment.setPlanRoot(nestedLoopJoinNode);
- nestedLoopJoin.getOtherJoinConjuncts().stream()
- .map(e -> ExpressionTranslator.translate(e,
context)).forEach(nestedLoopJoinNode::addConjunct);
+ List<Expr> joinConjuncts =
nestedLoopJoin.getOtherJoinConjuncts().stream()
+ .map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
+ nestedLoopJoinNode.setJoinConjuncts(joinConjuncts);
if (nestedLoopJoin.isShouldTranslateOutput()) {
// translate output expr on intermediate tuple
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
index ac86bf7bd7..8eb8e85256 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java
@@ -35,7 +35,6 @@ import org.apache.doris.catalog.ColumnStats;
import org.apache.doris.catalog.OlapTable;
import org.apache.doris.catalog.TableIf;
import org.apache.doris.common.CheckedMath;
-import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
@@ -266,38 +265,16 @@ public class HashJoinNode extends JoinNodeBase {
}
}
- // output slots + predicate slots = input slots
@Override
- public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws
NotImplementedException {
- Set<SlotId> result = Sets.newHashSet();
- Preconditions.checkState(outputSlotIds != null);
- // step1: change output slot id to src slot id
- if (vSrcToOutputSMap != null) {
- for (SlotId slotId : outputSlotIds) {
- SlotRef slotRef = new
SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
- Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
- if (srcExpr == null) {
- result.add(slotId);
- } else {
- List<SlotRef> srcSlotRefList = Lists.newArrayList();
- srcExpr.collect(SlotRef.class, srcSlotRefList);
- result.addAll(srcSlotRefList.stream().map(e ->
e.getSlotId()).collect(Collectors.toList()));
- }
- }
- }
+ protected List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer) {
// eq conjunct
- List<SlotId> eqConjunctSlotIds = Lists.newArrayList();
- Expr.getIds(eqJoinConjuncts, null, eqConjunctSlotIds);
- result.addAll(eqConjunctSlotIds);
+ List<SlotId> joinConjunctSlotIds = Lists.newArrayList();
+ Expr.getIds(eqJoinConjuncts, null, joinConjunctSlotIds);
// other conjunct
List<SlotId> otherConjunctSlotIds = Lists.newArrayList();
Expr.getIds(otherJoinConjuncts, null, otherConjunctSlotIds);
- result.addAll(otherConjunctSlotIds);
- // conjunct
- List<SlotId> conjunctSlotIds = Lists.newArrayList();
- Expr.getIds(conjuncts, null, conjunctSlotIds);
- result.addAll(conjunctSlotIds);
- return result;
+ joinConjunctSlotIds.addAll(otherConjunctSlotIds);
+ return joinConjunctSlotIds;
}
@Override
@@ -308,7 +285,6 @@ public class HashJoinNode extends JoinNodeBase {
List<Expr> newEqJoinConjuncts = Expr.substituteList(eqJoinConjuncts,
combinedChildSmap, analyzer, false);
eqJoinConjuncts =
newEqJoinConjuncts.stream().map(entity -> (BinaryPredicate)
entity).collect(Collectors.toList());
- assignedConjuncts = analyzer.getAssignedConjuncts();
otherJoinConjuncts = Expr.substituteList(otherJoinConjuncts,
combinedChildSmap, analyzer, false);
// Only for Vec: create new tuple for join result
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
index b10a6a96e6..d1e94c0f9f 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/JoinNodeBase.java
@@ -40,6 +40,7 @@ import org.apache.doris.thrift.TNullSide;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
@@ -50,6 +51,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
public abstract class JoinNodeBase extends PlanNode {
private static final Logger LOG = LogManager.getLogger(JoinNodeBase.class);
@@ -334,7 +336,7 @@ public abstract class JoinNodeBase extends PlanNode {
protected abstract Pair<Boolean, Boolean> needToCopyRightAndLeft();
- protected void computeOtherConjuncts(Analyzer analyzer,
ExprSubstitutionMap originToIntermediateSmap) {}
+ protected abstract void computeOtherConjuncts(Analyzer analyzer,
ExprSubstitutionMap originToIntermediateSmap);
protected void computeIntermediateTuple(Analyzer analyzer) throws
AnalysisException {
// 1. create new tuple
@@ -412,6 +414,34 @@ public abstract class JoinNodeBase extends PlanNode {
TupleIsNullPredicate.substitueListForTupleIsNull(vSrcToOutputSMap.getLhs(),
originTidsToIntermediateTidMap);
}
+ protected abstract List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer
analyzer);
+
+ @Override
+ public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws
NotImplementedException {
+ Set<SlotId> result = Sets.newHashSet();
+ Preconditions.checkState(outputSlotIds != null);
+ // step1: change output slot id to src slot id
+ if (vSrcToOutputSMap != null) {
+ for (SlotId slotId : outputSlotIds) {
+ SlotRef slotRef = new
SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
+ Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
+ if (srcExpr == null) {
+ result.add(slotId);
+ } else {
+ List<SlotRef> srcSlotRefList = Lists.newArrayList();
+ srcExpr.collect(SlotRef.class, srcSlotRefList);
+ result.addAll(srcSlotRefList.stream().map(e ->
e.getSlotId()).collect(Collectors.toList()));
+ }
+ }
+ }
+ result.addAll(computeSlotIdsForJoinConjuncts(analyzer));
+ // conjunct
+ List<SlotId> conjunctSlotIds = Lists.newArrayList();
+ Expr.getIds(conjuncts, null, conjunctSlotIds);
+ result.addAll(conjunctSlotIds);
+ return result;
+ }
+
@Override
public void finalize(Analyzer analyzer) throws UserException {
super.finalize(analyzer);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
index acd84ac986..eb09d679e6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java
@@ -23,11 +23,9 @@ import org.apache.doris.analysis.Expr;
import org.apache.doris.analysis.ExprSubstitutionMap;
import org.apache.doris.analysis.JoinOperator;
import org.apache.doris.analysis.SlotId;
-import org.apache.doris.analysis.SlotRef;
import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
-import org.apache.doris.common.NotImplementedException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.UserException;
import org.apache.doris.common.util.VectorizedUtil;
@@ -38,16 +36,12 @@ import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TPlanNodeType;
import com.google.common.base.MoreObjects;
-import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.Collections;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* Nested loop join between left child and right child.
@@ -58,6 +52,9 @@ public class NestedLoopJoinNode extends JoinNodeBase {
private boolean isOutputLeftSideOnly = false;
private List<Expr> runtimeFilterExpr = Lists.newArrayList();
+ private List<Expr> joinConjuncts;
+
+ private Expr vJoinConjunct;
public NestedLoopJoinNode(PlanNodeId id, PlanNode outer, PlanNode inner,
TableRef innerRef) {
super(id, "NESTED LOOP JOIN", StatisticalType.NESTED_LOOP_JOIN_NODE,
outer, inner, innerRef);
@@ -71,29 +68,16 @@ public class NestedLoopJoinNode extends JoinNodeBase {
|| joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
}
+ public void setJoinConjuncts(List<Expr> joinConjuncts) {
+ this.joinConjuncts = joinConjuncts;
+ }
+
@Override
- public Set<SlotId> computeInputSlotIds(Analyzer analyzer) throws
NotImplementedException {
- Set<SlotId> result = Sets.newHashSet();
- Preconditions.checkState(outputSlotIds != null);
- // step1: change output slot id to src slot id
- if (vSrcToOutputSMap != null) {
- for (SlotId slotId : outputSlotIds) {
- SlotRef slotRef = new
SlotRef(analyzer.getDescTbl().getSlotDesc(slotId));
- Expr srcExpr = vSrcToOutputSMap.mappingForRhsExpr(slotRef);
- if (srcExpr == null) {
- result.add(slotId);
- } else {
- List<SlotRef> srcSlotRefList = Lists.newArrayList();
- srcExpr.collect(SlotRef.class, srcSlotRefList);
- result.addAll(srcSlotRefList.stream().map(e ->
e.getSlotId()).collect(Collectors.toList()));
- }
- }
- }
+ protected List<SlotId> computeSlotIdsForJoinConjuncts(Analyzer analyzer) {
// conjunct
List<SlotId> conjunctSlotIds = Lists.newArrayList();
- Expr.getIds(conjuncts, null, conjunctSlotIds);
- result.addAll(conjunctSlotIds);
- return result;
+ Expr.getIds(joinConjuncts, null, conjunctSlotIds);
+ return conjunctSlotIds;
}
@Override
@@ -161,6 +145,25 @@ public class NestedLoopJoinNode extends JoinNodeBase {
LOG.debug("stats NestedLoopJoin: cardinality={}",
Long.toString(cardinality));
}
+ @Override
+ protected void computeOtherConjuncts(Analyzer analyzer,
ExprSubstitutionMap originToIntermediateSmap) {
+ joinConjuncts = Expr.substituteList(joinConjuncts,
originToIntermediateSmap, analyzer, false);
+ if (vJoinConjunct != null) {
+ vJoinConjunct =
+
Expr.substituteList(Collections.singletonList(vJoinConjunct),
originToIntermediateSmap, analyzer,
+ false).get(0);
+ }
+ }
+
+ @Override
+ public void convertToVectoriezd() {
+ if (!joinConjuncts.isEmpty()) {
+ vJoinConjunct =
convertConjunctsToAndCompoundPredicate(joinConjuncts);
+ initCompoundPredicate(vJoinConjunct);
+ }
+ super.convertToVectoriezd();
+ }
+
@Override
protected String debugString() {
return
MoreObjects.toStringHelper(this).addValue(super.debugString()).toString();
@@ -170,6 +173,9 @@ public class NestedLoopJoinNode extends JoinNodeBase {
protected void toThrift(TPlanNode msg) {
msg.nested_loop_join_node = new TNestedLoopJoinNode();
msg.nested_loop_join_node.join_op = joinOp.toThrift();
+ if (vJoinConjunct != null) {
+
msg.nested_loop_join_node.setVjoinConjunct(vJoinConjunct.treeToThrift());
+ }
if (vSrcToOutputSMap != null) {
for (int i = 0; i < vSrcToOutputSMap.size(); i++) {
// TODO: Enable it after we support new optimizers
@@ -196,6 +202,8 @@ public class NestedLoopJoinNode extends JoinNodeBase {
@Override
public void init(Analyzer analyzer) throws UserException {
super.init(analyzer);
+ ExprSubstitutionMap combinedChildSmap =
getCombinedChildWithoutTupleIsNullSmap();
+ joinConjuncts = Expr.substituteList(joinConjuncts, combinedChildSmap,
analyzer, false);
computeCrossRuntimeFilterExpr();
// Only for Vec: create new tuple for join result
@@ -226,6 +234,10 @@ public class NestedLoopJoinNode extends JoinNodeBase {
return output.toString();
}
+ if (!joinConjuncts.isEmpty()) {
+ output.append(detailPrefix).append("join conjuncts:
").append(getExplainString(joinConjuncts)).append("\n");
+ }
+
if (!conjuncts.isEmpty()) {
output.append(detailPrefix).append("predicates:
").append(getExplainString(conjuncts)).append("\n");
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
index 8bf52d3730..e9b50549c8 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SingleNodePlanner.java
@@ -2065,14 +2065,9 @@ public class SingleNodePlanner {
}
analyzer.markConjunctsAssigned(ojConjuncts);
if (eqJoinConjuncts.isEmpty()) {
- // construct cross join node
- // LOG.debug("Join between {} and {} requires at least one
conjunctive"
- // + " equality predicate between the two tables",
- // outerRef.getAliasAsName(), innerRef.getAliasAsName());
- // TODO If there are eq join predicates then we should construct a
hash join
NestedLoopJoinNode result =
new NestedLoopJoinNode(ctx.getNextNodeId(), outer, inner,
innerRef);
- result.addConjuncts(ojConjuncts);
+ result.setJoinConjuncts(ojConjuncts);
result.init(analyzer);
return result;
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index 2ba5c0c6b2..2725c67acc 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -615,6 +615,8 @@ struct TNestedLoopJoinNode {
// for bitmap filer, don't need to join, but output left child tuple
5: optional bool is_output_left_side_only
+
+ 6: optional Exprs.TExpr vjoin_conjunct
}
struct TMergeJoinNode {
diff --git a/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
b/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
index 5cde383a94..dfe581bd65 100644
--- a/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
+++ b/regression-test/data/query_p0/join/test_nestedloop_outer_join.out
@@ -75,3 +75,15 @@
2 2 4 4
3 3 4 4
+-- !join --
+1 1 2 1
+
+-- !join --
+1 1 2 1
+
+-- !join --
+1 1 2 1
+
+-- !join --
+1 1 2 1
+
diff --git
a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
index 5ed53d0402..ad19e55469 100644
--- a/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
+++ b/regression-test/suites/query_p0/join/test_nestedloop_outer_join.groovy
@@ -103,7 +103,23 @@ suite("test_nestedloop_outer_join", "query_p0") {
select * from ${tbl1} inner join ${tbl2} on ${tbl1}.user_id <
${tbl2}.user_id order by ${tbl1}.user_id, ${tbl2}.user_id;
"""
+ sql """ INSERT INTO ${tbl2} VALUES (2, 1); """
+ qt_join """
+ select * from ${tbl1} full outer join ${tbl2} on ${tbl1}.user_id <
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by
${tbl1}.user_id, ${tbl2}.user_id;
+ """
+
+ qt_join """
+ select * from ${tbl1} right outer join ${tbl2} on ${tbl1}.user_id <
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by
${tbl1}.user_id, ${tbl2}.user_id;
+ """
+
+ qt_join """
+ select * from ${tbl1} left outer join ${tbl2} on ${tbl1}.user_id <
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by
${tbl1}.user_id, ${tbl2}.user_id;
+ """
+
+ qt_join """
+ select * from ${tbl1} inner join ${tbl2} on ${tbl1}.user_id <
${tbl2}.user_id where ${tbl1}.user_id2 = ${tbl2}.user_id2 order by
${tbl1}.user_id, ${tbl2}.user_id;
+ """
sql "DROP TABLE IF EXISTS ${tbl1}"
sql "DROP TABLE IF EXISTS ${tbl2}"
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]