This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bfba058ecf [Feature](join) Support null aware left anti join (#13871)
bfba058ecf is described below
commit bfba058ecf4b3c41ee4d7f5eced22bf9ab8f74a5
Author: Gabriel <[email protected]>
AuthorDate: Thu Nov 3 12:11:25 2022 +0800
[Feature](join) Support null aware left anti join (#13871)
---
be/src/vec/exec/join/vhash_join_node.cpp | 98 +++++++++++++++-------
be/src/vec/exec/join/vhash_join_node.h | 18 ++--
.../org/apache/doris/analysis/JoinOperator.java | 3 +-
.../org/apache/doris/analysis/StmtRewriter.java | 8 +-
.../java/org/apache/doris/analysis/TableRef.java | 2 +
.../apache/doris/nereids/trees/plans/JoinType.java | 13 ++-
.../doris/planner/DistributedPlanColocateRule.java | 2 +
.../apache/doris/planner/DistributedPlanner.java | 30 +++++--
.../doris/planner/RuntimeFilterGenerator.java | 3 +-
.../org/apache/doris/rewrite/ExprRewriter.java | 4 +-
.../org/apache/doris/rewrite/InferFiltersRule.java | 6 +-
.../test_null_aware_left_anti_join.out | 10 +++
.../test_null_aware_left_anti_join.groovy | 66 +++++++++++++++
.../tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy | 4 +-
14 files changed, 207 insertions(+), 60 deletions(-)
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index eb419eb52f..a478df8744 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -59,8 +59,8 @@ struct ProcessHashTableBuild {
_build_side_compute_hash_timer(join_node->_build_side_compute_hash_timer) {}
template <bool need_null_map_for_build, bool ignore_null, bool
build_unique,
- bool has_runtime_filter>
- void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map) {
+ bool has_runtime_filter, bool short_circuit_for_null>
+ void run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map, bool*
has_null_key) {
using KeyGetter = typename HashTableContext::State;
using Mapped = typename HashTableContext::Mapped;
int64_t old_bucket_bytes =
hash_table_ctx.hash_table.get_buffer_size_in_bytes();
@@ -97,6 +97,15 @@ struct ProcessHashTableBuild {
continue;
}
}
+ // If apply short circuit strategy for null value (e.g. join
operator is
+ // NULL_AWARE_LEFT_ANTI_JOIN), we build hash table until we
meet a null value.
+ if constexpr (short_circuit_for_null &&
need_null_map_for_build) {
+ if ((*null_map)[k]) {
+ DCHECK(has_null_key);
+ *has_null_key = true;
+ return;
+ }
+ }
if constexpr
(IsSerializedHashTableContextTraits<KeyGetter>::value) {
_build_side_hash_values[k] =
hash_table_ctx.hash_table.hash(key_getter.get_key_holder(k, arena).key);
@@ -218,6 +227,7 @@ void ProcessHashTableProbe<JoinOpType,
ignore_null>::build_side_output_column(
constexpr auto is_semi_anti_join = JoinOpType::value ==
TJoinOp::RIGHT_ANTI_JOIN ||
JoinOpType::value ==
TJoinOp::RIGHT_SEMI_JOIN ||
JoinOpType::value ==
TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType::value ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
JoinOpType::value ==
TJoinOp::LEFT_SEMI_JOIN;
constexpr auto probe_all = JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN ||
@@ -380,7 +390,8 @@ Status ProcessHashTableProbe<JoinOpType,
ignore_null>::do_process(HashTableType&
key_getter.template prefetch<true>(hash_table_ctx.hash_table,
probe_index +
PREFETCH_STEP, _arena);
- if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN) {
+ if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType::value ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
if (!find_result.is_found()) {
++current_offset;
}
@@ -575,7 +586,8 @@ Status ProcessHashTableProbe<JoinOpType,
ignore_null>::do_process_with_other_joi
}
} else if constexpr (JoinOpType::value == TJoinOp::LEFT_OUTER_JOIN
||
JoinOpType::value == TJoinOp::FULL_OUTER_JOIN
||
- JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
{
+ JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN
||
+ JoinOpType::value ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
same_to_prev.emplace_back(false);
visited_map.emplace_back(nullptr);
// only full outer / left outer need insert the data of right
table
@@ -682,16 +694,23 @@ Status ProcessHashTableProbe<JoinOpType,
ignore_null>::do_process_with_other_joi
output_block->get_by_position(result_column_id).column =
std::move(new_filter_column);
- } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
{
+ } else if constexpr (JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN
||
+ JoinOpType::value ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
auto new_filter_column = ColumnVector<UInt8>::create();
auto& filter_map = new_filter_column->get_data();
if (!column->empty()) {
+ // Both equal conjuncts and other conjuncts are true
filter_map.emplace_back(column->get_bool(0) &&
visited_map[0]);
}
for (int i = 1; i < column->size(); ++i) {
if ((visited_map[i] && column->get_bool(i)) ||
(same_to_prev[i] && filter_map[i - 1])) {
+ // When either of two conditions is meet:
+ // 1. Both equal conjuncts and other conjuncts are
true or same_to_prev
+ // 2. This row is joined from the same build side row
as the previous row
+ // Set filter_map[i] to true and filter_map[i - 1] to
false if same_to_prev[i]
+ // is true.
filter_map.push_back(true);
filter_map[i - 1] = !same_to_prev[i] && filter_map[i -
1];
} else {
@@ -731,8 +750,10 @@ Status ProcessHashTableProbe<JoinOpType,
ignore_null>::do_process_with_other_joi
output_block->clear();
} else {
if constexpr (JoinOpType::value == TJoinOp::LEFT_SEMI_JOIN ||
- JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN)
+ JoinOpType::value == TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType::value ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
orig_columns = right_col_idx;
+ }
Block::filter_block(output_block, result_column_id,
orig_columns);
}
}
@@ -828,14 +849,16 @@ Status ProcessHashTableProbe<JoinOpType,
ignore_null>::process_data_in_hashtable
HashJoinNode::HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs)
: ExecNode(pool, tnode, descs),
_join_op(tnode.hash_join_node.join_op),
- _hash_table_rows(0),
_mem_used(0),
+
_have_other_join_conjunct(tnode.hash_join_node.__isset.vother_join_conjunct),
_match_all_probe(_join_op == TJoinOp::LEFT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN),
- _match_one_build(_join_op == TJoinOp::LEFT_SEMI_JOIN),
_match_all_build(_join_op == TJoinOp::RIGHT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN),
- _build_unique(_join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN),
+ _build_unique(!_have_other_join_conjunct &&
+ (_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ _join_op == TJoinOp::LEFT_ANTI_JOIN ||
+ _join_op == TJoinOp::LEFT_SEMI_JOIN)),
_is_right_semi_anti(_join_op == TJoinOp::RIGHT_ANTI_JOIN ||
_join_op == TJoinOp::RIGHT_SEMI_JOIN),
_is_outer_join(_match_all_build || _match_all_probe),
@@ -874,17 +897,17 @@ void HashJoinNode::init_join_op() {
Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
RETURN_IF_ERROR(ExecNode::init(tnode, state));
DCHECK(tnode.__isset.hash_join_node);
- if (tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
- return Status::InternalError("Do not support null aware left anti
join");
- }
const bool build_stores_null = _join_op == TJoinOp::RIGHT_OUTER_JOIN ||
_join_op == TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::RIGHT_ANTI_JOIN;
const bool probe_dispose_null =
- _match_all_probe || _build_unique || _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
+ _match_all_probe || _build_unique || _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
+ _join_op == TJoinOp::LEFT_ANTI_JOIN || _join_op ==
TJoinOp::LEFT_SEMI_JOIN;
const std::vector<TEqJoinCondition>& eq_join_conjuncts =
tnode.hash_join_node.eq_join_conjuncts;
+ std::vector<bool> probe_not_ignore_null(eq_join_conjuncts.size());
+ size_t conjuncts_index = 0;
for (const auto& eq_join_conjunct : eq_join_conjuncts) {
VExprContext* ctx = nullptr;
RETURN_IF_ERROR(VExpr::create_expr_tree(_pool, eq_join_conjunct.left,
&ctx));
@@ -897,17 +920,20 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
_is_null_safe_eq_join.push_back(null_aware);
// if is null aware, build join column and probe join column both need
dispose null value
- _build_not_ignore_null.emplace_back(
+ _store_null_in_hash_table.emplace_back(
null_aware ||
(_build_expr_ctxs.back()->root()->is_nullable() &&
build_stores_null));
- _probe_not_ignore_null.emplace_back(
+ probe_not_ignore_null[conjuncts_index] =
null_aware ||
- (_probe_expr_ctxs.back()->root()->is_nullable() &&
probe_dispose_null));
- _build_side_ignore_null |= !_build_not_ignore_null.back();
+ (_probe_expr_ctxs.back()->root()->is_nullable() &&
probe_dispose_null);
+ _build_side_ignore_null |= (_join_op !=
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
+ !_store_null_in_hash_table.back());
+ conjuncts_index++;
}
for (size_t i = 0; i < _probe_expr_ctxs.size(); ++i) {
- _probe_ignore_null |= !_probe_not_ignore_null[i];
+ _probe_ignore_null |= !probe_not_ignore_null[i];
}
+ _short_circuit_for_null_in_build_side = _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN;
_probe_column_disguise_null.reserve(eq_join_conjuncts.size());
@@ -918,8 +944,8 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
// If LEFT SEMI JOIN/LEFT ANTI JOIN with not equal predicate,
// build table should not be deduplicated.
- _build_unique = false;
- _have_other_join_conjunct = true;
+ DCHECK(!_build_unique);
+ DCHECK(_have_other_join_conjunct);
}
const auto& output_exprs = tnode.hash_join_node.srcExprList;
@@ -1057,6 +1083,12 @@ Status HashJoinNode::get_next(RuntimeState* state,
Block* output_block, bool* eo
SCOPED_TIMER(_runtime_profile->total_time_counter());
SCOPED_TIMER(_probe_timer);
+ if (_short_circuit_for_null_in_probe_side) {
+ // If we use a short-circuit strategy for null value in build side
(e.g. if join operator is
+ // NULL_AWARE_LEFT_ANTI_JOIN), we should return empty block directly.
+ *eos = true;
+ return Status::OK();
+ }
size_t probe_rows = _probe_block.rows();
if ((probe_rows == 0 || _probe_index == probe_rows) && !_probe_eos) {
_probe_index = 0;
@@ -1285,7 +1317,9 @@ Status HashJoinNode::_hash_table_build(RuntimeState*
state) {
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
Block block;
- while (!eos) {
+ // If eos or have already met a null value using short-circuit strategy,
we do not need to pull
+ // data from data.
+ while (!eos && !_short_circuit_for_null_in_probe_side) {
block.clear_column_data();
RETURN_IF_CANCELLED(state);
@@ -1315,7 +1349,7 @@ Status HashJoinNode::_hash_table_build(RuntimeState*
state) {
}
}
- if (!mutable_block.empty()) {
+ if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) {
if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
return Status::NotSupported(
strings::Substitute("data size of right table in hash join
> $0",
@@ -1356,7 +1390,7 @@ Status HashJoinNode::_extract_join_column(Block& block,
ColumnUInt8::MutablePtr&
DCHECK(null_map != nullptr);
VectorizedUtils::update_null_map(null_map->get_data(),
col_nullmap);
}
- if (_build_not_ignore_null[i]) {
+ if (_store_null_in_hash_table[i]) {
raw_ptrs[i] = nullable;
} else {
if constexpr (BuildSide) {
@@ -1400,7 +1434,7 @@ bool HashJoinNode::_need_null_map(Block& block, const
std::vector<int>& res_col_
auto column = block.get_by_position(res_col_ids[i]).column.get();
if constexpr (BuildSide) {
if (check_and_get_column<ColumnNullable>(*column)) {
- if (!_build_not_ignore_null[i]) {
+ if (!_store_null_in_hash_table[i]) {
return true;
}
}
@@ -1434,7 +1468,8 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block, uin
// so we have to initialize this flag by the first build block.
if (!_has_set_need_null_map_for_build) {
_has_set_need_null_map_for_build = true;
- _need_null_map_for_build = _need_null_map<true>(block, res_col_ids);
+ _need_null_map_for_build =
+ _short_circuit_for_null_in_build_side ||
_need_null_map<true>(block, res_col_ids);
}
if (_need_null_map_for_build) {
null_map_val = ColumnUInt8::create();
@@ -1458,21 +1493,24 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block, uin
std::visit(
[&](auto&& arg, auto has_null_value, auto build_unique, auto
has_runtime_filter_value,
- auto need_null_map_for_build) {
+ auto need_null_map_for_build, auto
short_circuit_for_null_in_build_side) {
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
ProcessHashTableBuild<HashTableCtxType>
hash_table_build_process(
rows, block, raw_ptrs, this, state->batch_size(),
offset);
hash_table_build_process.template
run<need_null_map_for_build, has_null_value,
- build_unique,
has_runtime_filter_value>(
- arg, need_null_map_for_build ?
&null_map_val->get_data() : nullptr);
+ build_unique,
has_runtime_filter_value,
+
short_circuit_for_null_in_build_side>(
+ arg, need_null_map_for_build ?
&null_map_val->get_data() : nullptr,
+ &_short_circuit_for_null_in_probe_side);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
}
},
_hash_table_variants, make_bool_variant(_build_side_ignore_null),
make_bool_variant(_build_unique),
make_bool_variant(has_runtime_filter),
- make_bool_variant(_need_null_map_for_build));
+ make_bool_variant(_need_null_map_for_build),
+ make_bool_variant(_short_circuit_for_null_in_build_side));
return st;
}
@@ -1488,7 +1526,7 @@ void HashJoinNode::_hash_table_init() {
JoinOpType::value ==
TJoinOp::RIGHT_OUTER_JOIN ||
JoinOpType::value ==
TJoinOp::FULL_OUTER_JOIN,
RowRefListWithFlag, RowRefList>>;
- if (_build_expr_ctxs.size() == 1 &&
!_build_not_ignore_null[0]) {
+ if (_build_expr_ctxs.size() == 1 &&
!_store_null_in_hash_table[0]) {
// Single column optimization
switch (_build_expr_ctxs[0]->root()->result_type()) {
case TYPE_BOOLEAN:
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 5f84211110..7d29268b39 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -288,7 +288,6 @@ public:
Status get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
override;
Status get_next(RuntimeState* state, Block* block, bool* eos) override;
Status close(RuntimeState* state) override;
- HashTableVariants& get_hash_table_variants() { return
_hash_table_variants; }
void init_join_op();
const RowDescriptor& row_desc() const override { return _output_row_desc; }
@@ -311,10 +310,8 @@ private:
// mark the join column whether support null eq
std::vector<bool> _is_null_safe_eq_join;
- // mark the build hash table whether contain null column
- std::vector<bool> _build_not_ignore_null;
- // mark the probe table should dispose null column
- std::vector<bool> _probe_not_ignore_null;
+ // mark the build hash table whether it needs to store null value
+ std::vector<bool> _store_null_in_hash_table;
std::vector<uint16_t> _probe_column_disguise_null;
std::vector<uint16_t> _probe_column_convert_to_null;
@@ -343,7 +340,6 @@ private:
RuntimeProfile::Counter* _join_filter_timer;
- int64_t _hash_table_rows;
int64_t _mem_used;
Arena _arena;
@@ -368,14 +364,20 @@ private:
Sizes _probe_key_sz;
Sizes _build_key_sz;
+ bool _have_other_join_conjunct;
const bool _match_all_probe; // output all rows coming from the probe
input. Full/Left Join
- const bool _match_one_build; // match at most one build row to each probe
row. Left semi Join
const bool _match_all_build; // output all rows coming from the build
input. Full/Right Join
bool _build_unique; // build a hash table without duplicated
rows. Left semi/anti Join
const bool _is_right_semi_anti;
const bool _is_outer_join;
- bool _have_other_join_conjunct = false;
+
+ // For null aware left anti join, we apply a short circuit strategy.
+ // 1. Set _short_circuit_for_null_in_build_side to true if join operator
is null aware left anti join.
+ // 2. In build phase, we stop building hash table when we meet the first
null value and set _short_circuit_for_null_in_probe_side to true.
+ // 3. In probe phase, if _short_circuit_for_null_in_probe_side is true,
join node returns empty block directly. Otherwise, probing will continue as the
same as generic left anti join.
+ bool _short_circuit_for_null_in_build_side = false;
+ bool _short_circuit_for_null_in_probe_side = false;
Block _join_block;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
index 6db551e76d..a01739a78e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/JoinOperator.java
@@ -61,7 +61,8 @@ public enum JoinOperator {
}
public boolean isSemiAntiJoin() {
- return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this ==
LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
+ return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this ==
LEFT_ANTI_JOIN
+ || this == NULL_AWARE_LEFT_ANTI_JOIN || this ==
RIGHT_ANTI_JOIN;
}
public boolean isSemiJoin() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
index 5cdb4bba36..06416c162b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/StmtRewriter.java
@@ -27,6 +27,7 @@ import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.TableAliasGenerator;
import org.apache.doris.common.UserException;
+import org.apache.doris.common.util.VectorizedUtil;
import org.apache.doris.policy.RowPolicy;
import org.apache.doris.qe.ConnectContext;
@@ -755,8 +756,8 @@ public class StmtRewriter {
// For the case of a NOT IN with an eq join conjunct, replace the
join
// conjunct with a conjunct that uses the null-matching eq
operator.
if (expr instanceof InPredicate) {
- // joinOp = JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
- joinOp = JoinOperator.LEFT_ANTI_JOIN;
+ joinOp = VectorizedUtil.isVectorized()
+ ? JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN :
JoinOperator.LEFT_ANTI_JOIN;
List<TupleId> tIds = Lists.newArrayList();
joinConjunct.getIds(tIds, null);
if (tIds.size() <= 1 ||
!tIds.contains(inlineView.getDesc().getId())) {
@@ -804,7 +805,8 @@ public class StmtRewriter {
for (int j = 0; j < tableIdx; ++j) {
TableRef tableRef = stmt.fromClause.get(j);
if (tableRef.getJoinOp() == JoinOperator.LEFT_SEMI_JOIN
- || tableRef.getJoinOp() ==
JoinOperator.LEFT_ANTI_JOIN) {
+ || tableRef.getJoinOp() == JoinOperator.LEFT_ANTI_JOIN
+ || tableRef.getJoinOp() ==
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
continue;
}
newItems.add(SelectListItem.createStarItem(tableRef.getAliasAsName()));
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
index 6a8099c719..0aa5d59291 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/TableRef.java
@@ -640,6 +640,8 @@ public class TableRef implements ParseNode, Writable {
return "FULL OUTER JOIN";
case CROSS_JOIN:
return "CROSS JOIN";
+ case NULL_AWARE_LEFT_ANTI_JOIN:
+ return "NULL AWARE LEFT ANTI JOIN";
default:
return "bad join op: " + joinOp.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
index b9e3d909c1..46eebdf221 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/JoinType.java
@@ -37,6 +37,7 @@ public enum JoinType {
LEFT_ANTI_JOIN,
RIGHT_ANTI_JOIN,
CROSS_JOIN,
+ NULL_AWARE_LEFT_ANTI_JOIN,
;
private static final Map<JoinType, JoinType> joinSwapMap = ImmutableMap
@@ -71,6 +72,8 @@ public enum JoinType {
return JoinOperator.FULL_OUTER_JOIN;
case LEFT_ANTI_JOIN:
return JoinOperator.LEFT_ANTI_JOIN;
+ case NULL_AWARE_LEFT_ANTI_JOIN:
+ return JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN;
case RIGHT_ANTI_JOIN:
return JoinOperator.RIGHT_ANTI_JOIN;
case LEFT_SEMI_JOIN:
@@ -97,7 +100,8 @@ public enum JoinType {
}
public final boolean isLeftJoin() {
- return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this ==
LEFT_SEMI_JOIN;
+ return this == LEFT_OUTER_JOIN || this == LEFT_ANTI_JOIN || this ==
NULL_AWARE_LEFT_ANTI_JOIN
+ || this == LEFT_SEMI_JOIN;
}
public final boolean isRightJoin() {
@@ -117,7 +121,7 @@ public enum JoinType {
}
public final boolean isLeftSemiOrAntiJoin() {
- return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN;
+ return this == LEFT_SEMI_JOIN || this == LEFT_ANTI_JOIN || this ==
NULL_AWARE_LEFT_ANTI_JOIN;
}
public final boolean isRightSemiOrAntiJoin() {
@@ -125,7 +129,8 @@ public enum JoinType {
}
public final boolean isSemiOrAntiJoin() {
- return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this ==
LEFT_ANTI_JOIN || this == RIGHT_ANTI_JOIN;
+ return this == LEFT_SEMI_JOIN || this == RIGHT_SEMI_JOIN || this ==
LEFT_ANTI_JOIN
+ || this == NULL_AWARE_LEFT_ANTI_JOIN || this ==
RIGHT_ANTI_JOIN;
}
public final boolean isOuterJoin() {
@@ -137,7 +142,7 @@ public enum JoinType {
}
public final boolean isRemainRightJoin() {
- return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN;
+ return this != LEFT_SEMI_JOIN && this != LEFT_ANTI_JOIN && this !=
NULL_AWARE_LEFT_ANTI_JOIN;
}
public final boolean isSwapJoinType() {
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
index 1ff492d0b6..7d3a8cde41 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanColocateRule.java
@@ -27,4 +27,6 @@ public class DistributedPlanColocateRule {
public static final String COLOCATE_GROUP_IS_NOT_STABLE = "Colocate group
is not stable";
public static final String INCONSISTENT_DISTRIBUTION_OF_TABLE_AND_QUERY
= "Inconsistent distribution of table and queries";
+ public static final String NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST
+ = "Build side of null aware left anti join must be broadcast";
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
index 19e343cc72..d005a6ff3c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DistributedPlanner.java
@@ -353,7 +353,10 @@ public class DistributedPlanner {
// - and the expected size of the hash tbl doesn't exceed
autoBroadcastThreshold
// we set partition join as default when broadcast join cost equals
partition join cost
- if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN &&
node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
+ if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+ doBroadcast = true;
+ } else if (node.getJoinOp() != JoinOperator.RIGHT_OUTER_JOIN
+ && node.getJoinOp() != JoinOperator.FULL_OUTER_JOIN) {
if (node.getInnerRef().isBroadcastJoin()) {
// respect user join hint
doBroadcast = true;
@@ -425,26 +428,33 @@ public class DistributedPlanner {
/**
* Colocate Join can be performed when the following 4 conditions are met
at the same time.
- * 1. Session variables disable_colocate_plan = false
- * 2. There is no join hints in HashJoinNode
- * 3. There are no exchange node between source scan node and HashJoinNode.
- * 4. The scan nodes which are related by EqConjuncts in HashJoinNode are
colocate and group can be matched.
+ * 1. Join operator is not NULL_AWARE_LEFT_ANTI_JOIN
+ * 2. Session variables disable_colocate_plan = false
+ * 3. There is no join hints in HashJoinNode
+ * 4. There are no exchange node between source scan node and HashJoinNode.
+ * 5. The scan nodes which are related by EqConjuncts in HashJoinNode are
colocate and group can be matched.
*/
private boolean canColocateJoin(HashJoinNode node, PlanFragment
leftChildFragment, PlanFragment rightChildFragment,
List<String> cannotReason) {
// Condition1
+ if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+
cannotReason.add(DistributedPlanColocateRule.NULL_AWARE_LEFT_ANTI_JOIN_MUST_BROADCAST);
+ return false;
+ }
+
+ // Condition2
if (ConnectContext.get().getSessionVariable().isDisableColocatePlan())
{
cannotReason.add(DistributedPlanColocateRule.SESSION_DISABLED);
return false;
}
- // Condition2: If user have a join hint to use proper way of join, can
not be colocate join
+ // Condition3: If user have a join hint to use proper way of join, can
not be colocate join
if (node.getInnerRef().hasJoinHints()) {
cannotReason.add(DistributedPlanColocateRule.HAS_JOIN_HINT);
return false;
}
- // Condition3:
+ // Condition4:
// If there is an exchange node between the HashJoinNode and their
real associated ScanNode,
// it means that the data has been rehashed.
// The rehashed data can no longer be guaranteed to correspond to the
left and right buckets,
@@ -468,7 +478,7 @@ public class DistributedPlanner {
predicateList.add(eqJoinPredicate);
}
- // Condition4
+ // Condition5
return dataDistributionMatchEqPredicate(scanNodeWithJoinConjuncts,
cannotReason);
}
@@ -581,6 +591,10 @@ public class DistributedPlanner {
private boolean canBucketShuffleJoin(HashJoinNode node, PlanFragment
leftChildFragment,
List<Expr> rhsHashExprs) {
+ if (node.getJoinOp() == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) {
+ return false;
+ }
+
if
(!ConnectContext.get().getSessionVariable().isEnableBucketShuffleJoin()) {
return false;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
index 98d4a86cf7..b926c5147b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/RuntimeFilterGenerator.java
@@ -222,7 +222,8 @@ public final class RuntimeFilterGenerator {
// from the ON clause.
if (!joinNode.getJoinOp().isLeftOuterJoin()
&& !joinNode.getJoinOp().isFullOuterJoin()
- &&
!joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)) {
+ &&
!joinNode.getJoinOp().equals(JoinOperator.LEFT_ANTI_JOIN)
+ &&
!joinNode.getJoinOp().equals(JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN)) {
joinConjuncts.addAll(joinNode.getEqJoinConjuncts());
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
index b0d82ddbaf..2154389c6b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ExprRewriter.java
@@ -74,7 +74,9 @@ public class ExprRewriter {
case FULL_OUTER_JOIN: return FULL_OUTER_JOIN_CLAUSE;
case LEFT_SEMI_JOIN: return LEFT_SEMI_JOIN_CLAUSE;
case RIGHT_SEMI_JOIN: return RIGHT_SEMI_JOIN_CLAUSE;
- case LEFT_ANTI_JOIN: return LEFT_ANTI_JOIN_CLAUSE;
+ case NULL_AWARE_LEFT_ANTI_JOIN:
+ case LEFT_ANTI_JOIN:
+ return LEFT_ANTI_JOIN_CLAUSE;
case RIGHT_ANTI_JOIN: return RIGHT_ANTI_JOIN_CLAUSE;
case CROSS_JOIN: return CROSS_JOIN_CLAUSE;
default: return OTHER_CLAUSE;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
index 1574fd4c86..1b92475a37 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/rewrite/InferFiltersRule.java
@@ -457,7 +457,8 @@ public class InferFiltersRule implements ExprRewriteRule {
|| (joinOperator == JoinOperator.LEFT_SEMI_JOIN)
|| (!needChange && joinOperator ==
JoinOperator.RIGHT_OUTER_JOIN)
|| (needChange && (joinOperator ==
JoinOperator.LEFT_OUTER_JOIN
- || joinOperator == JoinOperator.LEFT_ANTI_JOIN))) {
+ || joinOperator == JoinOperator.LEFT_ANTI_JOIN
+ || joinOperator ==
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) {
ret = true;
}
} else if (clauseType == ExprRewriter.ClauseType.WHERE_CLAUSE) {
@@ -465,7 +466,8 @@ public class InferFiltersRule implements ExprRewriteRule {
|| (joinOperator == JoinOperator.LEFT_SEMI_JOIN
|| (needChange && joinOperator ==
JoinOperator.RIGHT_OUTER_JOIN))
|| (!needChange && (joinOperator ==
JoinOperator.LEFT_OUTER_JOIN
- || joinOperator == JoinOperator.LEFT_ANTI_JOIN))) {
+ || joinOperator == JoinOperator.LEFT_ANTI_JOIN
+ || joinOperator ==
JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN))) {
ret = true;
}
}
diff --git
a/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out
b/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out
new file mode 100644
index 0000000000..d149258eda
--- /dev/null
+++ b/regression-test/data/correctness_p0/test_null_aware_left_anti_join.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select --
+2
+
+-- !select --
+\N
+2
+
+-- !select --
+
diff --git
a/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
new file mode 100644
index 0000000000..b25e992cad
--- /dev/null
+++
b/regression-test/suites/correctness_p0/test_null_aware_left_anti_join.groovy
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_null_aware_left_anti_join") {
+ def tableName1 = "test_null_aware_left_anti_join1"
+ def tableName2 = "test_null_aware_left_anti_join2"
+ sql """
+ drop table if exists ${tableName1};
+ """
+
+ sql """
+ drop table if exists ${tableName2};
+ """
+
+ sql """
+ create table if not exists ${tableName1} ( `k1` int(11) NULL )
DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES (
"replication_num" = "1");
+ """
+
+ sql """
+ create table if not exists ${tableName2} ( `k1` int(11) NULL )
DISTRIBUTED BY HASH(`k1`) BUCKETS 4 PROPERTIES (
"replication_num" = "1");
+ """
+
+ sql """
+ insert into ${tableName1} values (1), (3);
+ """
+
+ sql """
+ insert into ${tableName2} values (1), (2);
+ """
+
+ qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in
(select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
+
+ sql """
+ insert into ${tableName2} values(null);
+ """
+
+ qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in
(select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
+
+ sql """
+ insert into ${tableName1} values(null);
+ """
+
+ qt_select """ select ${tableName2}.k1 from ${tableName2} where k1 not in
(select ${tableName1}.k1 from ${tableName1}) order by ${tableName2}.k1; """
+
+ sql """
+ drop table if exists ${tableName2};
+ """
+
+ sql """
+ drop table if exists ${tableName1};
+ """
+}
diff --git
a/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
b/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
index 0314b123ba..b50a8780c4 100644
--- a/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
+++ b/regression-test/suites/tpch_sf1_p1/tpch_sf1/explain/test_q16.groovy
@@ -67,9 +67,9 @@ suite("test_explain_tpch_sf_1_q16") {
explainStr.contains("VAGGREGATE (update serialize)\n" +
" | STREAMING\n" +
" | group by: <slot 29>, <slot 30>, <slot
31>, <slot 27>") &&
- explainStr.contains("join op: LEFT ANTI JOIN(BROADCAST)[The src
data has been redistributed]\n" +
+ explainStr.contains("join op: NULL AWARE LEFT ANTI
JOIN(BROADCAST)[Build side of null aware left anti join must be broadcast]\n" +
" | equal join conjunct: <slot 21> =
`s_suppkey`") &&
- explainStr.contains("vec output tuple id: 8") &&
+ explainStr.contains("vec output tuple id: 8") &&
explainStr.contains("output slot ids: 27 29 30 31 \n" +
" | hash output slot ids: 21 23 24 25 ") &&
explainStr.contains("join op: INNER JOIN(BROADCAST)[Tables are
not in the same group]\n" +
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]