This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new bc22802b6a4 [test](ut) add cases about hash join (#49803)
bc22802b6a4 is described below
commit bc22802b6a43a6d62ac7ad1ed5000e2222976fca
Author: Jerry Hu <[email protected]>
AuthorDate: Tue Apr 15 09:28:13 2025 +0800
[test](ut) add cases about hash join (#49803)
---
be/src/pipeline/exec/hashjoin_build_sink.h | 8 -
be/src/pipeline/exec/hashjoin_probe_operator.cpp | 5 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 5 +-
be/src/runtime/runtime_state.h | 2 +-
be/src/util/stack_util.cpp | 2 +-
.../pipeline/operator/hash_join_test_helper.cpp | 630 ++++++++++++
be/test/pipeline/operator/hash_join_test_helper.h | 56 +
.../pipeline/operator/hashjoin_build_sink_test.cpp | 362 +++++++
.../operator/hashjoin_probe_operator_test.cpp | 1074 ++++++++++++++++++++
be/test/pipeline/operator/join_test_helper.cpp | 43 +
be/test/pipeline/operator/join_test_helper.h | 53 +
be/test/testutil/mock/mock_operators.h | 7 +
be/test/testutil/mock/mock_runtime_state.h | 5 +
be/test/testutil/run_all_tests.cpp | 22 +-
14 files changed, 2258 insertions(+), 16 deletions(-)
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index c6b6cdff029..ff477cd1105 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -42,14 +42,6 @@ public:
void init_short_circuit_for_probe();
bool build_unique() const;
- std::shared_ptr<vectorized::Arena> arena() { return _shared_state->arena; }
-
- void add_hash_buckets_info(const std::string& info) const {
- _profile->add_info_string("HashTableBuckets", info);
- }
- void add_hash_buckets_filled_info(const std::string& info) const {
- _profile->add_info_string("HashTableFilledBuckets", info);
- }
Dependency* finishdependency() override { return _finish_dependency.get();
}
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
index ab52f01fa5b..6138369fe23 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.cpp
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.cpp
@@ -529,11 +529,14 @@ Status HashJoinProbeOperatorX::prepare(RuntimeState*
state) {
}
for (auto conjunct : _other_join_conjuncts) {
-
conjunct->root()->collect_slot_column_ids(_other_conjunct_refer_column_ids);
+
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
}
for (auto& conjunct : _mark_join_conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, *_intermediate_row_desc));
+ if (_have_other_join_conjunct) {
+
conjunct->root()->collect_slot_column_ids(_should_not_lazy_materialized_column_ids);
+ }
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_probe_expr_ctxs, state,
_child->row_desc()));
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 3914fc0d58d..d1aa2280454 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -155,7 +155,8 @@ public:
bool need_finalize_variant_column() const { return
_need_finalize_variant_column; }
bool is_lazy_materialized_column(int column_id) const {
- return _have_other_join_conjunct &&
!_other_conjunct_refer_column_ids.contains(column_id);
+ return _have_other_join_conjunct &&
+ !_should_not_lazy_materialized_column_ids.contains(column_id);
}
private:
@@ -185,7 +186,7 @@ private:
std::vector<bool> _left_output_slot_flags;
std::vector<bool> _right_output_slot_flags;
bool _need_finalize_variant_column = false;
- std::set<int> _other_conjunct_refer_column_ids;
+ std::set<int> _should_not_lazy_materialized_column_ids;
std::vector<std::string> _right_table_column_names;
const std::vector<TExpr> _partition_exprs;
};
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index 5af2f643f38..4037c22d31a 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -490,7 +490,7 @@ public:
: 0;
}
- bool enable_share_hash_table_for_broadcast_join() const {
+ MOCK_FUNCTION bool enable_share_hash_table_for_broadcast_join() const {
return
_query_options.__isset.enable_share_hash_table_for_broadcast_join &&
_query_options.enable_share_hash_table_for_broadcast_join;
}
diff --git a/be/src/util/stack_util.cpp b/be/src/util/stack_util.cpp
index d84c4eb4f21..4adeb7b5548 100644
--- a/be/src/util/stack_util.cpp
+++ b/be/src/util/stack_util.cpp
@@ -45,7 +45,7 @@ std::string get_stack_trace(int start_pointers_index,
std::string dwarf_location
dwarf_location_info_mode = config::dwarf_location_info_mode;
}
#ifdef BE_TEST
- auto tool = std::string {"libunwind"};
+ auto tool = std::string {"boost"};
#else
auto tool = config::get_stack_trace_tool;
#endif
diff --git a/be/test/pipeline/operator/hash_join_test_helper.cpp
b/be/test/pipeline/operator/hash_join_test_helper.cpp
new file mode 100644
index 00000000000..532e2c55c70
--- /dev/null
+++ b/be/test/pipeline/operator/hash_join_test_helper.cpp
@@ -0,0 +1,630 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "hash_join_test_helper.h"
+
+#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <glog/logging.h>
+
+#include <cstddef>
+#include <iostream>
+#include <sstream>
+#include <unordered_map>
+#include <vector>
+
+#include "testutil/creators.h"
+#include "testutil/mock/mock_operators.h"
+
+namespace doris::pipeline {
+TPlanNode HashJoinTestHelper::create_test_plan_node(
+ const TJoinOp::type& join_op_type, const
std::vector<TPrimitiveType::type>& key_types,
+ const std::vector<bool>& left_keys_nullable, const std::vector<bool>&
right_keys_nullable,
+ const bool is_mark_join, const size_t mark_join_conjuncts_size, const
bool null_safe_equal,
+ const bool has_other_join_conjuncts) {
+ DCHECK_EQ(key_types.size(), left_keys_nullable.size());
+ DCHECK_EQ(key_types.size(), right_keys_nullable.size());
+ DCHECK_GE(key_types.size(), mark_join_conjuncts_size);
+
+ TPlanNode tnode;
+ tnode.node_id = 0;
+ tnode.node_type = TPlanNodeType::HASH_JOIN_NODE;
+ tnode.num_children = 2;
+ tnode.__set_hash_join_node(THashJoinNode());
+ tnode.hash_join_node.join_op = join_op_type;
+ tnode.limit = -1;
+ tnode.hash_join_node.__set_is_mark(is_mark_join);
+
+ size_t mark_join_conjuncts_count = 0;
+
+ doris::TSlotId col_unique_id = 0;
+ for (size_t i = 0; i != key_types.size(); ++i) {
+ const auto key_type = key_types[i];
+ const auto left_key_nullable = left_keys_nullable[i];
+ const auto right_key_nullable = right_keys_nullable[i];
+ TEqJoinCondition eq_cond;
+ eq_cond.left = TExpr();
+ eq_cond.right = TExpr();
+ if (null_safe_equal) {
+ eq_cond.opcode = TExprOpcode::EQ_FOR_NULL;
+ } else {
+ eq_cond.opcode = TExprOpcode::EQ;
+ }
+ eq_cond.__isset.opcode = true;
+ TTypeNode type_node;
+ type_node.type = TTypeNodeType::SCALAR;
+ type_node.scalar_type.type = key_type;
+ type_node.__isset.scalar_type = true;
+
+ if (key_type == TPrimitiveType::CHAR || key_type ==
TPrimitiveType::VARCHAR ||
+ key_type == TPrimitiveType::STRING) {
+ type_node.scalar_type.__set_len(OLAP_STRING_MAX_LENGTH);
+ } else if (key_type == TPrimitiveType::DECIMAL128I ||
+ key_type == TPrimitiveType::DECIMAL256 ||
+ key_type == TPrimitiveType::DECIMAL32 || key_type ==
TPrimitiveType::DECIMAL64 ||
+ key_type == TPrimitiveType::DECIMALV2) {
+ type_node.scalar_type.__set_precision(18);
+ type_node.scalar_type.__set_scale(18);
+ } else if (key_type == TPrimitiveType::DATETIMEV2) {
+ type_node.scalar_type.__set_scale(6);
+ } else if (key_type == TPrimitiveType::TIMEV2) {
+ type_node.scalar_type.__set_scale(0);
+ }
+
+ eq_cond.left.nodes.emplace_back();
+ eq_cond.left.nodes[0].type.types.emplace_back(type_node);
+ eq_cond.left.nodes[0].node_type = TExprNodeType::SLOT_REF;
+ eq_cond.left.nodes[0].__set_is_nullable(left_key_nullable);
+ eq_cond.left.nodes[0].num_children = 0;
+ eq_cond.left.nodes[0].slot_ref.col_unique_id = col_unique_id++;
+ eq_cond.left.nodes[0].__isset.slot_ref = true;
+
+ eq_cond.right.nodes.emplace_back();
+ eq_cond.right.nodes[0].type.types.emplace_back(type_node);
+ eq_cond.right.nodes[0].node_type = TExprNodeType::SLOT_REF;
+ eq_cond.right.nodes[0].__set_is_nullable(right_key_nullable);
+ eq_cond.right.nodes[0].num_children = 0;
+ eq_cond.right.nodes[0].slot_ref.col_unique_id = col_unique_id++;
+ eq_cond.right.nodes[0].__isset.slot_ref = true;
+
+ if (mark_join_conjuncts_count < mark_join_conjuncts_size) {
+ TExpr mark_join_cond;
+ mark_join_cond.nodes.emplace_back();
+ mark_join_cond.nodes[0].node_type = TExprNodeType::BINARY_PRED;
+ mark_join_cond.nodes[0].opcode = TExprOpcode::EQ;
+ mark_join_cond.nodes[0].type.types.emplace_back();
+ mark_join_cond.nodes[0].type.types[0].scalar_type.type =
TPrimitiveType::BOOLEAN;
+ mark_join_cond.nodes[0].type.types[0].__isset.scalar_type = true;
+ mark_join_cond.nodes[0].__set_is_nullable(true);
+ mark_join_cond.nodes[0].num_children = 2;
+
+ mark_join_cond.nodes[0].fn.name.function_name = "eq";
+ mark_join_cond.nodes[0].__isset.fn = true;
+
+ mark_join_cond.nodes.emplace_back();
+ mark_join_cond.nodes[1] = eq_cond.left.nodes[0];
+ mark_join_cond.nodes.emplace_back();
+ mark_join_cond.nodes[2] = eq_cond.right.nodes[0];
+
tnode.hash_join_node.mark_join_conjuncts.emplace_back(mark_join_cond);
+ tnode.hash_join_node.__isset.mark_join_conjuncts = true;
+
+ ++mark_join_conjuncts_count;
+ } else {
+ tnode.hash_join_node.eq_join_conjuncts.push_back(eq_cond);
+ }
+ }
+
+ if (has_other_join_conjuncts) {
+ TExpr other_join_cond;
+ other_join_cond.nodes.emplace_back();
+ other_join_cond.nodes[0].node_type = TExprNodeType::BINARY_PRED;
+ other_join_cond.nodes[0].opcode = TExprOpcode::EQ;
+ other_join_cond.nodes[0].type.types.emplace_back();
+ other_join_cond.nodes[0].type.types[0].scalar_type.type =
TPrimitiveType::BOOLEAN;
+ other_join_cond.nodes[0].type.types[0].__isset.scalar_type = true;
+ other_join_cond.nodes[0].__set_is_nullable(true);
+ other_join_cond.nodes[0].num_children = 2;
+
+ other_join_cond.nodes[0].fn.name.function_name = "gt";
+ other_join_cond.nodes[0].__isset.fn = true;
+
+ other_join_cond.nodes.emplace_back();
+ other_join_cond.nodes[1].node_type = TExprNodeType::SLOT_REF;
+ other_join_cond.nodes[1].type.types.emplace_back();
+ other_join_cond.nodes[1].type.types[0].scalar_type.type =
TPrimitiveType::INT;
+ other_join_cond.nodes[1].__set_is_nullable(true);
+ other_join_cond.nodes[1].type.types[0].__isset.scalar_type = true;
+ other_join_cond.nodes[1].num_children = 0;
+ other_join_cond.nodes[1].slot_ref.col_unique_id = col_unique_id++;
+ other_join_cond.nodes[1].__isset.slot_ref = true;
+
+ other_join_cond.nodes.emplace_back();
+ other_join_cond.nodes[2].node_type = TExprNodeType::INT_LITERAL;
+ other_join_cond.nodes[2].type.types.emplace_back();
+ other_join_cond.nodes[2].type.types[0].scalar_type.type =
TPrimitiveType::INT;
+ other_join_cond.nodes[2].type.types[0].__isset.scalar_type = true;
+ other_join_cond.nodes[2].num_children = 0;
+ other_join_cond.nodes[2].int_literal.value = 100;
+ other_join_cond.nodes[2].__isset.int_literal = true;
+
+ tnode.hash_join_node.other_join_conjuncts.push_back(other_join_cond);
+
+ other_join_cond.nodes[2].int_literal.value = 50;
+ other_join_cond.nodes[1].slot_ref.col_unique_id = col_unique_id++;
+ tnode.hash_join_node.other_join_conjuncts.push_back(other_join_cond);
+ tnode.hash_join_node.__isset.other_join_conjuncts = true;
+ }
+
+ tnode.row_tuples.push_back(0);
+ tnode.row_tuples.push_back(1);
+ tnode.nullable_tuples.push_back(false);
+ tnode.nullable_tuples.push_back(false);
+ tnode.__isset.hash_join_node = true;
+
+ tnode.hash_join_node.vintermediate_tuple_id_list.emplace_back(2);
+ tnode.hash_join_node.__isset.vintermediate_tuple_id_list = true;
+
+ auto desc_table = create_test_table_descriptor(tnode);
+ auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl);
+ DCHECK(!desc_table.slotDescriptors.empty());
+
+ runtime_state->set_desc_tbl(desc_tbl);
+
+ return tnode;
+}
+
+TDescriptorTable HashJoinTestHelper::create_test_table_descriptor(TPlanNode&
tnode) {
+ TTupleDescriptorBuilder left_tuple_builder, right_tuple_builder,
intermediate_tuple_builder,
+ output_tuple_builder;
+
+ const auto is_left_half_join =
+ tnode.hash_join_node.join_op == TJoinOp::LEFT_SEMI_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::LEFT_ANTI_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN
||
+ tnode.hash_join_node.join_op == TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN;
+ const auto is_right_half_join = tnode.hash_join_node.join_op ==
TJoinOp::RIGHT_ANTI_JOIN ||
+ tnode.hash_join_node.join_op ==
TJoinOp::RIGHT_SEMI_JOIN;
+ const auto is_half_join = is_left_half_join || is_right_half_join;
+
+ const auto has_other_join_conjuncts =
!tnode.hash_join_node.other_join_conjuncts.empty();
+ const auto intermediate_need_output_left_side =
+ !is_half_join || is_left_half_join ||
+ !tnode.hash_join_node.mark_join_conjuncts.empty() ||
has_other_join_conjuncts;
+ const auto need_output_left_side = !is_half_join || is_left_half_join;
+
+ const auto intermediate_need_output_right_side =
+ !is_half_join || is_right_half_join ||
+ !tnode.hash_join_node.mark_join_conjuncts.empty() ||
has_other_join_conjuncts;
+ const auto need_output_right_side = !is_half_join || is_right_half_join;
+
+ const auto other_conjuncts_count =
tnode.hash_join_node.other_join_conjuncts.empty() ? 0 : 1;
+
+ const auto keys_count = tnode.hash_join_node.eq_join_conjuncts.size();
+ const auto mark_keys_count =
tnode.hash_join_node.mark_join_conjuncts.size();
+ const auto total_output_slots_count =
+ (need_output_left_side ? keys_count + mark_keys_count +
other_conjuncts_count : 0) +
+ (need_output_right_side ? keys_count + mark_keys_count +
other_conjuncts_count : 0);
+ const auto total_intermediate_output_slots_count =
+ (intermediate_need_output_left_side
+ ? keys_count + mark_keys_count + other_conjuncts_count
+ : 0) +
+ (intermediate_need_output_right_side
+ ? keys_count + mark_keys_count + other_conjuncts_count
+ : 0);
+ std::vector<TSlotDescriptor>
intermediate_slots(total_intermediate_output_slots_count);
+ std::vector<TSlotDescriptor> output_slots(total_output_slots_count);
+ const auto intermediate_keys_offset =
+ intermediate_need_output_left_side
+ ? keys_count + mark_keys_count + other_conjuncts_count
+ : 0;
+ const auto keys_offset =
+ need_output_left_side ? keys_count + mark_keys_count +
other_conjuncts_count : 0;
+
+ size_t slots_count = 0;
+ doris::TSlotId col_unique_id = 200;
+ std::vector<doris::TSlotId> hash_output_unique_ids;
+ std::vector<TExpr> projection_exprs(total_output_slots_count);
+ for (auto& eq_cond : tnode.hash_join_node.mark_join_conjuncts) {
+ auto left_type =
thrift_to_type(eq_cond.nodes[1].type.types[0].scalar_type.type);
+ auto slot = TSlotDescriptorBuilder()
+ .type(left_type)
+ .nullable(eq_cond.nodes[1].is_nullable)
+ .build();
+ slot.slotType.types[0].scalar_type =
eq_cond.nodes[1].type.types[0].scalar_type;
+ slot.col_unique_id = eq_cond.nodes[1].slot_ref.col_unique_id;
+ if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) {
+ slot.nullIndicatorByte = 0;
+ slot.nullIndicatorBit = 0;
+ }
+
+ intermediate_slots[slots_count] = slot;
+ if (need_output_left_side) {
+ auto& expr = projection_exprs[slots_count];
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(slot.nullIndicatorBit == 0 &&
slot.nullIndicatorByte == 0);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type =
slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_slots[slots_count] = slot;
+ }
+
+ slot.col_unique_id = col_unique_id++;
+ left_tuple_builder.add_slot(slot);
+ hash_output_unique_ids.emplace_back(slot.col_unique_id);
+
+ auto right_type =
thrift_to_type(eq_cond.nodes[2].type.types[0].scalar_type.type);
+
+ slot = TSlotDescriptorBuilder()
+ .type(right_type)
+ .nullable(eq_cond.nodes[2].is_nullable)
+ .build();
+ slot.slotType.types[0].scalar_type =
eq_cond.nodes[2].type.types[0].scalar_type;
+ slot.col_unique_id = eq_cond.nodes[2].slot_ref.col_unique_id;
+ if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) {
+ slot.nullIndicatorByte = 0;
+ slot.nullIndicatorBit = 0;
+ }
+ intermediate_slots[slots_count + intermediate_keys_offset] = slot;
+ if (need_output_right_side) {
+ auto& expr = projection_exprs[slots_count + keys_offset];
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(slot.nullIndicatorBit == 0 &&
slot.nullIndicatorByte == 0);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type =
slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_slots[slots_count + keys_offset] = slot;
+ }
+ slot.col_unique_id = col_unique_id++;
+ right_tuple_builder.add_slot(slot);
+ hash_output_unique_ids.emplace_back(slot.col_unique_id);
+ ++slots_count;
+ }
+
+ for (auto& eq_cond : tnode.hash_join_node.eq_join_conjuncts) {
+ auto left_type =
thrift_to_type(eq_cond.left.nodes[0].type.types[0].scalar_type.type);
+ auto slot = TSlotDescriptorBuilder()
+ .type(left_type)
+ .nullable(eq_cond.left.nodes[0].is_nullable)
+ .build();
+ slot.slotType.types[0].scalar_type =
eq_cond.left.nodes[0].type.types[0].scalar_type;
+ slot.col_unique_id = eq_cond.left.nodes[0].slot_ref.col_unique_id;
+ left_tuple_builder.add_slot(slot);
+ if (intermediate_need_output_left_side || need_output_left_side) {
+ hash_output_unique_ids.emplace_back(slot.col_unique_id);
+ }
+
+ slot.col_unique_id = col_unique_id++;
+ if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) {
+ slot.nullIndicatorByte = 0;
+ slot.nullIndicatorBit = 0;
+ }
+
+ if (intermediate_need_output_left_side) {
+ intermediate_slots[slots_count] = slot;
+ }
+
+ if (need_output_left_side) {
+ auto& expr = projection_exprs[slots_count];
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(slot.nullIndicatorBit == 0 &&
slot.nullIndicatorByte == 0);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type =
slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_slots[slots_count] = slot;
+ }
+
+ auto right_type =
thrift_to_type(eq_cond.right.nodes[0].type.types[0].scalar_type.type);
+
+ slot = TSlotDescriptorBuilder()
+ .type(right_type)
+ .nullable(eq_cond.right.nodes[0].is_nullable)
+ .build();
+ slot.slotType.types[0].scalar_type =
eq_cond.right.nodes[0].type.types[0].scalar_type;
+ slot.col_unique_id = eq_cond.right.nodes[0].slot_ref.col_unique_id;
+ right_tuple_builder.add_slot(slot);
+ if (need_output_right_side || intermediate_need_output_right_side) {
+ hash_output_unique_ids.emplace_back(slot.col_unique_id);
+ }
+ slot.col_unique_id = col_unique_id++;
+ if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) {
+ slot.nullIndicatorByte = 0;
+ slot.nullIndicatorBit = 0;
+ }
+
+ if (intermediate_need_output_right_side) {
+ intermediate_slots[slots_count + intermediate_keys_offset] = slot;
+ }
+ if (need_output_right_side) {
+ auto& expr = projection_exprs[slots_count + keys_offset];
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(slot.nullIndicatorBit == 0 &&
slot.nullIndicatorByte == 0);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type =
slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_slots[slots_count + keys_offset] = slot;
+ }
+ ++slots_count;
+ }
+
+ if (tnode.hash_join_node.other_join_conjuncts.size() == 2) {
+ auto& left_cond = tnode.hash_join_node.other_join_conjuncts[0];
+ auto& right_cond = tnode.hash_join_node.other_join_conjuncts[1];
+
+ auto left_type =
thrift_to_type(left_cond.nodes[1].type.types[0].scalar_type.type);
+ auto slot = TSlotDescriptorBuilder()
+ .type(left_type)
+ .nullable(left_cond.nodes[1].is_nullable)
+ .build();
+ slot.slotType.types[0].scalar_type =
left_cond.nodes[1].type.types[0].scalar_type;
+
+ slot.col_unique_id = col_unique_id++;
+ left_tuple_builder.add_slot(slot);
+ hash_output_unique_ids.emplace_back(slot.col_unique_id);
+
+ slot.col_unique_id = left_cond.nodes[1].slot_ref.col_unique_id;
+ if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::RIGHT_OUTER_JOIN) {
+ slot.nullIndicatorByte = 0;
+ slot.nullIndicatorBit = 0;
+ }
+
+ intermediate_slots[slots_count] = slot;
+
+ if (need_output_left_side) {
+ auto& expr = projection_exprs[slots_count];
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(slot.nullIndicatorBit == 0 &&
slot.nullIndicatorByte == 0);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type =
slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_slots[slots_count] = slot;
+ }
+
+ auto right_type =
thrift_to_type(right_cond.nodes[1].type.types[0].scalar_type.type);
+ slot = TSlotDescriptorBuilder()
+ .type(right_type)
+ .nullable(right_cond.nodes[1].is_nullable)
+ .build();
+ slot.slotType.types[0].scalar_type =
right_cond.nodes[1].type.types[0].scalar_type;
+
+ slot.col_unique_id = col_unique_id++;
+ right_tuple_builder.add_slot(slot);
+ hash_output_unique_ids.emplace_back(slot.col_unique_id);
+
+ slot.col_unique_id = right_cond.nodes[1].slot_ref.col_unique_id;
+ if (tnode.hash_join_node.join_op == TJoinOp::FULL_OUTER_JOIN ||
+ tnode.hash_join_node.join_op == TJoinOp::LEFT_OUTER_JOIN) {
+ slot.nullIndicatorByte = 0;
+ slot.nullIndicatorBit = 0;
+ }
+
+ intermediate_slots[slots_count + intermediate_keys_offset] = slot;
+
+ if (need_output_right_side) {
+ auto& expr = projection_exprs[slots_count + keys_offset];
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(slot.nullIndicatorBit == 0 &&
slot.nullIndicatorByte == 0);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type =
slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_slots[slots_count + keys_offset] = slot;
+ }
+
+ ++slots_count;
+ }
+
+ TDescriptorTableBuilder builder;
+
+ left_tuple_builder.build(&builder);
+ right_tuple_builder.build(&builder);
+
+ for (auto& slot : intermediate_slots) {
+ intermediate_tuple_builder.add_slot(slot);
+ }
+
+ for (auto& slot : output_slots) {
+ output_tuple_builder.add_slot(slot);
+ }
+
+ if (tnode.hash_join_node.is_mark) {
+ auto type = thrift_to_type(TPrimitiveType::BOOLEAN);
+ auto slot = TSlotDescriptorBuilder().type(type).nullable(true).build();
+ slot.col_unique_id = col_unique_id++;
+ intermediate_tuple_builder.add_slot(slot);
+
+ auto& expr = projection_exprs.emplace_back();
+ auto& node = expr.nodes.emplace_back();
+ node.node_type = TExprNodeType::SLOT_REF;
+ node.__set_is_nullable(true);
+ node.num_children = 0;
+ node.__isset.slot_ref = true;
+ node.slot_ref.col_unique_id = slot.col_unique_id;
+ node.slot_ref.slot_id = slot.id;
+ node.slot_ref.__isset.col_unique_id = true;
+ node.type.types.emplace_back();
+ node.type.types[0].scalar_type = slot.slotType.types[0].scalar_type;
+ node.type.types[0].__isset.scalar_type = true;
+
+ slot.col_unique_id = col_unique_id++;
+ output_tuple_builder.add_slot(slot);
+ }
+
+ tnode.projections = projection_exprs;
+ tnode.__isset.projections = true;
+
+ intermediate_tuple_builder.build(&builder);
+
+ output_tuple_builder.build(&builder);
+ tnode.__set_output_tuple_id(3);
+ tnode.hash_join_node.__set_voutput_tuple_id(3);
+
+ auto table_desc = builder.desc_tbl();
+
+ std::unordered_map<int32_t, TTupleId> slots_map;
+ for (auto& slot : table_desc.slotDescriptors) {
+ slots_map[slot.col_unique_id] = slot.id;
+ }
+
+ for (auto& eq_cond : tnode.hash_join_node.eq_join_conjuncts) {
+ col_unique_id = eq_cond.left.nodes[0].slot_ref.col_unique_id;
+ eq_cond.left.nodes[0].slot_ref.slot_id = slots_map[col_unique_id];
+
+ col_unique_id = eq_cond.right.nodes[0].slot_ref.col_unique_id;
+ eq_cond.right.nodes[0].slot_ref.slot_id = slots_map[col_unique_id];
+ }
+
+ for (size_t i = 0; i != mark_keys_count; ++i) {
+ auto& slot_ref =
tnode.hash_join_node.mark_join_conjuncts[i].nodes[1].slot_ref;
+ slot_ref.slot_id = slots_map[slot_ref.col_unique_id];
+
+ auto& slot_ref_right =
tnode.hash_join_node.mark_join_conjuncts[i].nodes[2].slot_ref;
+ slot_ref_right.slot_id = slots_map[slot_ref_right.col_unique_id];
+ }
+
+ for (auto& expr : tnode.projections) {
+ for (auto& node : expr.nodes) {
+ if (node.node_type == TExprNodeType::SLOT_REF) {
+ auto col_unique_id = node.slot_ref.col_unique_id;
+ node.slot_ref.slot_id = slots_map[col_unique_id];
+ }
+ }
+ }
+
+ if (tnode.hash_join_node.other_join_conjuncts.size() == 2) {
+ auto& left_cond = tnode.hash_join_node.other_join_conjuncts[0];
+ auto& right_cond = tnode.hash_join_node.other_join_conjuncts[1];
+
+ left_cond.nodes[1].slot_ref.slot_id =
slots_map[left_cond.nodes[1].slot_ref.col_unique_id];
+ right_cond.nodes[1].slot_ref.slot_id =
+ slots_map[right_cond.nodes[1].slot_ref.col_unique_id];
+ }
+
+ for (auto uid : hash_output_unique_ids) {
+ tnode.hash_join_node.hash_output_slot_ids.emplace_back(slots_map[uid]);
+ }
+ tnode.hash_join_node.__isset.hash_output_slot_ids = true;
+
+ return table_desc;
+}
+
+void HashJoinTestHelper::add_mark_join_conjuncts(TPlanNode& join_node,
+ std::vector<TExpr>&
conjuncts) {
+ EXPECT_TRUE(join_node.__isset.hash_join_node);
+
+ join_node.hash_join_node.__isset.mark_join_conjuncts = true;
+ join_node.hash_join_node.mark_join_conjuncts.insert(
+ join_node.hash_join_node.mark_join_conjuncts.end(),
conjuncts.begin(), conjuncts.end());
+}
+
+void HashJoinTestHelper::add_other_join_conjuncts(TPlanNode& join_node,
+ std::vector<TExpr>&
conjuncts) {
+ EXPECT_TRUE(join_node.__isset.hash_join_node);
+
+ join_node.hash_join_node.__isset.other_join_conjuncts = true;
+ join_node.hash_join_node.other_join_conjuncts.insert(
+ join_node.hash_join_node.other_join_conjuncts.end(),
conjuncts.begin(),
+ conjuncts.end());
+}
+
+std::pair<std::shared_ptr<HashJoinProbeOperatorX>,
std::shared_ptr<HashJoinBuildSinkOperatorX>>
+HashJoinTestHelper::create_operators(const TPlanNode& tnode) {
+ auto sink_operator =
std::make_shared<HashJoinBuildSinkOperatorX>(obj_pool.get(), 0, 0, tnode,
+
runtime_state->desc_tbl());
+
+ auto probe_operator =
std::make_shared<HashJoinProbeOperatorX>(obj_pool.get(), tnode, 0,
+
runtime_state->desc_tbl());
+
+ auto child_operator = std::make_shared<MockSourceOperator>();
+ auto probe_side_source_operator = std::make_shared<MockSourceOperator>();
+ auto probe_side_sink_operator = std::make_shared<MockSinkOperator>();
+
+ RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false});
+ child_operator->_row_descriptor = row_desc;
+
+ RowDescriptor probe_side_row_desc(runtime_state->desc_tbl(), {0}, {false});
+ probe_side_source_operator->_row_descriptor = probe_side_row_desc;
+
+ EXPECT_TRUE(sink_operator->set_child(child_operator));
+ EXPECT_TRUE(probe_operator->set_child(probe_side_source_operator));
+ EXPECT_TRUE(probe_operator->set_child(child_operator));
+
+ // Setup task and state
+ std::map<int,
+ std::pair<std::shared_ptr<BasicSharedState>,
std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ auto [source_pipeline, _] = generate_sort_pipeline(probe_operator,
probe_side_sink_operator,
+ sink_operator,
child_operator);
+ pipeline_task = std::make_shared<PipelineTask>(source_pipeline, 0,
runtime_state.get(), nullptr,
+ nullptr, shared_state_map,
0);
+
+ return {std::move(probe_operator), std::move(sink_operator)};
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/hash_join_test_helper.h
b/be/test/pipeline/operator/hash_join_test_helper.h
new file mode 100644
index 00000000000..355368a37d0
--- /dev/null
+++ b/be/test/pipeline/operator/hash_join_test_helper.h
@@ -0,0 +1,56 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/Exprs_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include "join_test_helper.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/hashjoin_probe_operator.h"
+
+namespace doris::pipeline {
+class HashJoinTestHelper : public JoinTestHelper {
+public:
+ TPlanNode create_test_plan_node(const TJoinOp::type& join_op_type,
+ const std::vector<TPrimitiveType::type>&
key_types,
+ const std::vector<bool>&
left_keys_nullable,
+ const std::vector<bool>&
right_keys_nullable,
+ const bool is_mark_join = false,
+ const size_t mark_join_conjuncts_size = 0,
+ const bool null_safe_equal = false,
+ const bool has_other_join_conjuncts =
false);
+
+ TDescriptorTable create_test_table_descriptor(TPlanNode& tnode);
+
+ void add_mark_join_conjuncts(TPlanNode& join_node, std::vector<TExpr>&
conjuncts);
+ void add_other_join_conjuncts(TPlanNode& join_node, std::vector<TExpr>&
conjuncts);
+
+ std::pair<std::shared_ptr<HashJoinProbeOperatorX>,
std::shared_ptr<HashJoinBuildSinkOperatorX>>
+ create_operators(const TPlanNode& tnode);
+
+protected:
+ std::vector<TExprNode*> _left_slots, _right_slots;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/hashjoin_build_sink_test.cpp
b/be/test/pipeline/operator/hashjoin_build_sink_test.cpp
new file mode 100644
index 00000000000..7d60834b138
--- /dev/null
+++ b/be/test/pipeline/operator/hashjoin_build_sink_test.cpp
@@ -0,0 +1,362 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/exec/hashjoin_build_sink.h"
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+#include <vector>
+
+#include "common/config.h"
+#include "hash_join_test_helper.h"
+#include "pipeline/exec/operator.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/descriptors.h"
+#include "runtime/runtime_state.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/core/block.h"
+#include "vec/exprs/vexpr_context.h"
+
+namespace doris::pipeline {
+
+class HashJoinBuildSinkTest : public testing::Test {
+public:
+ void SetUp() override { _helper.SetUp(); }
+ void TearDown() override { _helper.TearDown(); }
+
+ template <typename Func>
+ void run_test_block(Func test_block) {
+ auto testing_join_ops = {TJoinOp::INNER_JOIN,
+ TJoinOp::LEFT_OUTER_JOIN,
+ TJoinOp::RIGHT_OUTER_JOIN,
+ TJoinOp::FULL_OUTER_JOIN,
+ TJoinOp::LEFT_SEMI_JOIN,
+ TJoinOp::RIGHT_SEMI_JOIN,
+ TJoinOp::LEFT_ANTI_JOIN,
+ TJoinOp::RIGHT_ANTI_JOIN,
+ TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
+ TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN};
+ auto testing_key_types = {
+ TPrimitiveType::BOOLEAN, TPrimitiveType::TINYINT,
TPrimitiveType::SMALLINT,
+ TPrimitiveType::INT, TPrimitiveType::BIGINT,
TPrimitiveType::FLOAT,
+ TPrimitiveType::DOUBLE, TPrimitiveType::DATE,
TPrimitiveType::DATETIME,
+ TPrimitiveType::BINARY, TPrimitiveType::CHAR,
TPrimitiveType::LARGEINT,
+ TPrimitiveType::VARCHAR, TPrimitiveType::DECIMALV2,
TPrimitiveType::TIME,
+ TPrimitiveType::STRING, TPrimitiveType::DATEV2,
TPrimitiveType::DATETIMEV2,
+ TPrimitiveType::TIMEV2, TPrimitiveType::DECIMAL32,
TPrimitiveType::DECIMAL64,
+ TPrimitiveType::DECIMAL128I, TPrimitiveType::DECIMAL256,
TPrimitiveType::IPV4,
+ TPrimitiveType::IPV6};
+
+ for (const auto& op_type : testing_join_ops) {
+ for (const auto key_type : testing_key_types) {
+ for (auto left_nullable : {true, false}) {
+ for (auto right_nullable : {true, false}) {
+ test_block(op_type, {key_type}, {left_nullable},
{right_nullable});
+ }
+ }
+
+ for (const auto key_type2 : testing_key_types) {
+ for (auto left_nullable : {true, false}) {
+ for (auto right_nullable : {true, false}) {
+ test_block(op_type, {key_type, key_type2},
+ {left_nullable, right_nullable},
+ {right_nullable, left_nullable});
+ }
+ }
+ }
+ }
+ }
+ }
+
+protected:
+ HashJoinTestHelper _helper;
+};
+
+TEST_F(HashJoinBuildSinkTest, Init) {
+ auto test_block = [&](TJoinOp::type op_type, const
std::vector<TPrimitiveType::type>& key_types,
+ const std::vector<bool>& left_nullables,
+ const std::vector<bool>& right_nullables) {
+ auto tnode =
+ _helper.create_test_plan_node(op_type, key_types,
left_nullables, right_nullables);
+ auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+ ASSERT_TRUE(probe_operator);
+ ASSERT_TRUE(sink_operator);
+
+ auto runtime_state = std::make_unique<MockRuntimeState>();
+ runtime_state->_query_ctx = _helper.query_ctx.get();
+ runtime_state->_query_id = _helper.query_ctx->query_id();
+ runtime_state->resize_op_id_to_local_state(-100);
+ runtime_state->set_max_operator_id(-100);
+ runtime_state->set_desc_tbl(_helper.desc_tbl);
+
+ auto st = sink_operator->init(tnode, runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ ASSERT_TRUE(shared_state);
+
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = shared_state_map,
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ st = probe_operator->init(tnode, runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+ st = probe_operator->prepare(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ LocalStateInfo info2 {.parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .shared_state_map = shared_state_map,
+ .task_idx = 0};
+
+ st = probe_operator->setup_local_state(runtime_state.get(), info2);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* probe_local_state =
runtime_state->get_local_state(probe_operator->operator_id());
+ ASSERT_TRUE(probe_local_state);
+
+ st = probe_local_state->open(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>(
+ runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state);
+
+ st = local_state->open(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+
ASSERT_GT(sink_operator->get_memory_usage_debug_str(runtime_state.get()).size(),
0);
+
+ st = local_state->close(runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = sink_operator->close(runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = probe_operator->close(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = probe_local_state->close(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+ };
+
+ run_test_block(test_block);
+}
+
+TEST_F(HashJoinBuildSinkTest, Sink) {
+ auto test_block = [&](TJoinOp::type op_type, const
std::vector<TPrimitiveType::type>& key_types,
+ const std::vector<bool>& left_nullables,
+ const std::vector<bool>& right_nullables) {
+ auto tnode =
+ _helper.create_test_plan_node(op_type, key_types,
left_nullables, right_nullables);
+ auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+ ASSERT_TRUE(probe_operator);
+ ASSERT_TRUE(sink_operator);
+
+ auto runtime_state = std::make_unique<MockRuntimeState>();
+ runtime_state->_query_ctx = _helper.query_ctx.get();
+ runtime_state->_query_id = _helper.query_ctx->query_id();
+ runtime_state->resize_op_id_to_local_state(-100);
+ runtime_state->set_max_operator_id(-100);
+ runtime_state->set_desc_tbl(_helper.desc_tbl);
+
+ auto st = sink_operator->init(TDataSink());
+ ASSERT_FALSE(st.ok());
+
+ st = sink_operator->init(tnode, runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ ASSERT_TRUE(shared_state);
+
+ shared_state->create_source_dependency(sink_operator->operator_id(),
tnode.node_id,
+ "HashJoinSinkTestDep");
+
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = shared_state_map,
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>(
+ runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state);
+
+ st = local_state->open(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(),
false), 0);
+
+ const auto& row_desc = sink_operator->child()->row_desc();
+ vectorized::Block block(row_desc.tuple_descriptors()[0]->slots(), 0);
+
+ auto mutable_block = vectorized::MutableBlock(block.clone_empty());
+ for (auto& col : mutable_block.mutable_columns()) {
+ col->insert_default();
+ if (col->is_nullable()) {
+ auto& nullable_column =
assert_cast<vectorized::ColumnNullable&>(*col);
+ nullable_column.insert_not_null_elements(1);
+ } else {
+ col->insert_default();
+ }
+ }
+
+ auto block2 = mutable_block.to_block();
+ st = sink_operator->sink(runtime_state.get(), &block2, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->get_reserve_mem_size(runtime_state.get(),
true), 0);
+ st = sink_operator->sink(runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(),
true), 0);
+
+ ASSERT_GT(sink_operator->get_memory_usage(runtime_state.get()), 0);
+
+ st = local_state->close(runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = sink_operator->close(runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+ };
+
+ run_test_block(test_block);
+}
+
+TEST_F(HashJoinBuildSinkTest, Terminate) {
+ auto test_block = [&](TJoinOp::type op_type, const
std::vector<TPrimitiveType::type>& key_types,
+ const std::vector<bool>& left_nullables,
+ const std::vector<bool>& right_nullables) {
+ auto tnode =
+ _helper.create_test_plan_node(op_type, key_types,
left_nullables, right_nullables);
+ auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+ ASSERT_TRUE(probe_operator);
+ ASSERT_TRUE(sink_operator);
+
+ auto runtime_state = std::make_unique<MockRuntimeState>();
+ runtime_state->_query_ctx = _helper.query_ctx.get();
+ runtime_state->_query_id = _helper.query_ctx->query_id();
+ runtime_state->resize_op_id_to_local_state(-100);
+ runtime_state->set_max_operator_id(-100);
+ runtime_state->set_desc_tbl(_helper.desc_tbl);
+
+ auto st = sink_operator->init(TDataSink());
+ ASSERT_FALSE(st.ok());
+
+ st = sink_operator->init(tnode, runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_state = sink_operator->create_shared_state();
+ ASSERT_TRUE(shared_state);
+
+ shared_state->create_source_dependency(sink_operator->operator_id(),
tnode.node_id,
+ "HashJoinSinkTestDep");
+
+ std::map<int, std::pair<std::shared_ptr<BasicSharedState>,
+ std::vector<std::shared_ptr<Dependency>>>>
+ shared_state_map;
+ LocalSinkStateInfo info {.task_idx = 0,
+ .parent_profile =
_helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = shared_state_map,
+ .tsink = TDataSink()};
+ st = sink_operator->setup_local_state(runtime_state.get(), info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* local_state = reinterpret_cast<HashJoinBuildSinkLocalState*>(
+ runtime_state->get_sink_local_state());
+ ASSERT_TRUE(local_state);
+
+ st = local_state->open(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(),
false), 0);
+
+ const auto& row_desc = sink_operator->child()->row_desc();
+ vectorized::Block block(row_desc.tuple_descriptors()[0]->slots(), 0);
+
+ auto mutable_block = vectorized::MutableBlock(block.clone_empty());
+ for (auto& col : mutable_block.mutable_columns()) {
+ col->insert_default();
+ if (col->is_nullable()) {
+ auto& nullable_column =
assert_cast<vectorized::ColumnNullable&>(*col);
+ nullable_column.insert_not_null_elements(1);
+ } else {
+ col->insert_default();
+ }
+ }
+
+ auto block2 = mutable_block.to_block();
+ st = sink_operator->sink(runtime_state.get(), &block2, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ st = sink_operator->terminate(runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "terminate failed: " << st.to_string();
+
+ ASSERT_GT(sink_operator->get_reserve_mem_size(runtime_state.get(),
true), 0);
+ st = sink_operator->sink(runtime_state.get(), &block, true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ ASSERT_EQ(sink_operator->get_reserve_mem_size(runtime_state.get(),
true), 0);
+
+ ASSERT_GT(sink_operator->get_memory_usage(runtime_state.get()), 0);
+
+ st = local_state->close(runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ st = sink_operator->close(runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+ };
+
+ run_test_block(test_block);
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
new file mode 100644
index 00000000000..7c5fec3116c
--- /dev/null
+++ b/be/test/pipeline/operator/hashjoin_probe_operator_test.cpp
@@ -0,0 +1,1074 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gen_cpp/PlanNodes_types.h>
+#include <gen_cpp/Types_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <cassert>
+#include <cstddef>
+#include <memory>
+#include <type_traits>
+#include <utility>
+#include <vector>
+
+#include "hash_join_test_helper.h"
+#include "pipeline/dependency.h"
+#include "pipeline/exec/hashjoin_build_sink.h"
+#include "pipeline/exec/operator.h"
+#include "testutil/column_helper.h"
+#include "testutil/mock/mock_operators.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/core/field.h"
+#include "vec/core/sort_block.h"
+#include "vec/data_types/data_type_number.h"
+#include "vec/data_types/data_type_string.h"
+
+namespace doris::pipeline {
+
+using namespace vectorized;
+
+class HashJoinProbeOperatorTest : public testing::Test {
+public:
+ void SetUp() override { _helper.SetUp(); }
+ void TearDown() override { _helper.TearDown(); }
+
+ template <typename T>
+ void check_column_value(const IColumn& column, const size_t index, const
T& value) {
+ StringRef data;
+ if (column.is_nullable()) {
+ const auto& nullable_column = assert_cast<const
ColumnNullable&>(column);
+ EXPECT_FALSE(nullable_column.is_null_at(index));
+ auto nested_column = nullable_column.get_nested_column_ptr();
+ data = nested_column->get_data_at(index);
+ } else {
+ data = column.get_data_at(index);
+ }
+
+ if constexpr (std::is_same_v<std::string, T>) {
+ EXPECT_EQ(data.to_string(), value);
+ } else if constexpr (std::is_same_v<StringRef, T>) {
+ EXPECT_EQ(data.to_string(), value.to_string());
+ } else {
+ EXPECT_EQ(sizeof(T), data.size);
+ EXPECT_EQ(memcmp(data.data, &value, sizeof(T)), 0);
+ }
+ }
+
+ void check_column_values(const IColumn& column, const
std::vector<vectorized::Field>& values,
+ std::source_location loc =
std::source_location::current()) {
+ for (size_t i = 0; i != values.size(); ++i) {
+ vectorized::Field value;
+ column.get(i, value);
+ ASSERT_EQ(value.get_type(), values[i].get_type())
+ << "row: " << i << " type not match at: " <<
loc.file_name() << ":"
+ << loc.line();
+ ASSERT_TRUE(value == values[i])
+ << "row: " << i << " value not match at: " <<
loc.file_name() << ":"
+ << loc.line();
+ }
+ }
+
+ Block sort_block_by_columns(Block& block, const std::vector<size_t>&
sort_columns = {}) {
+ SortDescription sort_description;
+ for (auto column : sort_columns) {
+ sort_description.emplace_back(column, 1, 1);
+ }
+
+ if (sort_description.empty()) {
+ for (size_t i = 0; i != block.columns(); ++i) {
+ sort_description.emplace_back(i, 1, 1);
+ }
+ }
+
+ auto sorted_block = block.clone_empty();
+ sort_block(block, sorted_block, sort_description);
+ return sorted_block;
+ }
+
+ struct JoinParams {
+ TJoinOp::type join_op_type {TJoinOp::INNER_JOIN};
+ bool is_mark_join {false};
+ size_t mark_join_conjuncts_size {0};
+ bool is_broadcast_join {false};
+ bool null_safe_equal {false};
+ bool has_other_join_conjuncts {false};
+ };
+
+ // NOLINTNEXTLINE(readability-function-*)
+ void run_test(const JoinParams& join_params, const
std::vector<TPrimitiveType::type>& key_types,
+ const std::vector<bool>& left_keys_nullable,
+ const std::vector<bool>& right_keys_nullable,
std::vector<Block>& build_blocks,
+ std::vector<Block>& probe_blocks, Block& output_block) {
+ auto tnode = _helper.create_test_plan_node(
+ join_params.join_op_type, key_types, left_keys_nullable,
right_keys_nullable,
+ join_params.is_mark_join, join_params.mark_join_conjuncts_size,
+ join_params.null_safe_equal,
join_params.has_other_join_conjuncts);
+
+ bool should_build_hash_table = true;
+ if (join_params.is_broadcast_join) {
+ tnode.hash_join_node.__isset.is_broadcast_join = true;
+ tnode.hash_join_node.is_broadcast_join = true;
+ should_build_hash_table = true;
+ }
+
+ auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+ ASSERT_TRUE(probe_operator);
+ ASSERT_TRUE(sink_operator);
+
+ auto st = probe_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = probe_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ st = sink_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = sink_operator->prepare(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "prepare failed: " << st.to_string();
+
+ auto shared_runtime_state = std::make_shared<MockRuntimeState>();
+
+ auto shared_state = sink_operator->create_shared_state();
+ if (join_params.is_broadcast_join) {
+ shared_state = HashJoinSharedState::create_shared(8);
+ shared_state->create_source_dependencies(8,
sink_operator->operator_id(),
+ sink_operator->node_id(),
"HASH_JOIN_PROBE");
+ shared_runtime_state->_query_ctx =
_helper.runtime_state->_query_ctx;
+ shared_runtime_state->_query_id = _helper.runtime_state->_query_id;
+ shared_runtime_state->resize_op_id_to_local_state(-100);
+ shared_runtime_state->set_max_operator_id(-100);
+
+ LocalSinkStateInfo sink_local_state_info {
+ .task_idx = 1,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = {},
+ .tsink = TDataSink(),
+ };
+ st = sink_operator->setup_local_state(shared_runtime_state.get(),
+ sink_local_state_info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " <<
st.to_string();
+
+ auto* sink_local_state = assert_cast<HashJoinBuildSinkLocalState*>(
+ shared_runtime_state->get_sink_local_state());
+ ASSERT_TRUE(sink_local_state);
+
+ st = sink_local_state->open(shared_runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+ }
+
+ ASSERT_EQ(probe_operator->is_broadcast_join(),
join_params.is_broadcast_join);
+ ASSERT_TRUE(shared_state);
+ LocalSinkStateInfo sink_local_state_info {
+ .task_idx = 0,
+ .parent_profile = _helper.runtime_profile.get(),
+ .sender_id = 0,
+ .shared_state = shared_state.get(),
+ .shared_state_map = {},
+ .tsink = TDataSink(),
+ };
+
+ st = sink_operator->setup_local_state(_helper.runtime_state.get(),
sink_local_state_info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ ASSERT_EQ(sink_operator->should_dry_run(_helper.runtime_state.get()),
+ join_params.is_broadcast_join && !should_build_hash_table);
+
+ ASSERT_EQ(sink_operator->require_data_distribution(), false);
+ ASSERT_EQ(probe_operator->require_data_distribution(), false);
+ ASSERT_FALSE(sink_operator->is_shuffled_operator());
+ ASSERT_FALSE(probe_operator->is_shuffled_operator());
+ std::cout << "sink distribution: "
+ << get_exchange_type_name(
+
sink_operator->required_data_distribution().distribution_type)
+ << std::endl;
+ std::cout << "probe distribution: "
+ << get_exchange_type_name(
+
probe_operator->required_data_distribution().distribution_type)
+ << std::endl;
+
+ LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(),
+ .scan_ranges = {},
+ .shared_state = shared_state.get(),
+ .shared_state_map = {},
+ .task_idx = 0};
+ st = probe_operator->setup_local_state(_helper.runtime_state.get(),
info);
+ ASSERT_TRUE(st.ok()) << "setup_local_state failed: " << st.to_string();
+
+ auto* sink_local_state = assert_cast<HashJoinBuildSinkLocalState*>(
+ _helper.runtime_state->get_sink_local_state());
+ ASSERT_TRUE(sink_local_state);
+
+ st = sink_local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ auto* probe_local_state =
+
_helper.runtime_state->get_local_state(probe_operator->operator_id());
+ ASSERT_TRUE(probe_local_state);
+
+ st = probe_local_state->open(_helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "open failed: " << st.to_string();
+
+ std::cout << "probe debug string: " << probe_operator->debug_string(0)
<< std::endl;
+
+ for (auto& build_block : build_blocks) {
+ st = sink_operator->sink(_helper.runtime_state.get(),
&build_block, false);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+ }
+ ASSERT_EQ(sink_local_state->build_unique(),
+ tnode.hash_join_node.other_join_conjuncts.empty() &&
+ (tnode.hash_join_node.join_op ==
TJoinOp::LEFT_ANTI_JOIN ||
+ tnode.hash_join_node.join_op ==
TJoinOp::LEFT_SEMI_JOIN ||
+ tnode.hash_join_node.join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN));
+
+ Block empty_block;
+
+ if (join_params.is_broadcast_join) {
+ st = sink_operator->sink(shared_runtime_state.get(), &empty_block,
false);
+ ASSERT_FALSE(st.ok());
+ }
+
+ st = sink_operator->sink(_helper.runtime_state.get(), &empty_block,
true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ st = sink_operator->close(_helper.runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+
+ if (join_params.is_broadcast_join) {
+ st = sink_operator->sink(shared_runtime_state.get(), &empty_block,
true);
+ ASSERT_TRUE(st.ok()) << "sink failed: " << st.to_string();
+
+ st = sink_operator->close(shared_runtime_state.get(), st);
+ ASSERT_TRUE(st.ok()) << "close failed: " << st.to_string();
+ }
+
+ auto source_operator =
+
std::dynamic_pointer_cast<MockSourceOperator>(probe_operator->child());
+ ASSERT_TRUE(source_operator);
+
+ bool eos = false;
+ Block tmp_output_block;
+ MutableBlock output_block_mutable;
+ for (auto& probe_block : probe_blocks) {
+ source_operator->set_block(std::move(probe_block));
+ st =
probe_operator->get_block_after_projects(_helper.runtime_state.get(),
+ &tmp_output_block,
&eos);
+ ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+
+ if (tmp_output_block.empty()) {
+ continue;
+ }
+
+ st = output_block_mutable.merge(std::move(tmp_output_block));
+ tmp_output_block.clear();
+ ASSERT_TRUE(st.ok()) << "merge failed: " << st.to_string();
+ }
+
+ if (eos) {
+ return;
+ }
+
+ source_operator->set_eos();
+
+ st = probe_operator->get_block(_helper.runtime_state.get(),
&tmp_output_block, &eos);
+ ASSERT_TRUE(st.ok()) << "get_block failed: " << st.to_string();
+ ASSERT_TRUE(eos);
+
+ if (!tmp_output_block.empty()) {
+ st = output_block_mutable.merge(std::move(tmp_output_block));
+ ASSERT_TRUE(st.ok()) << "merge failed: " << st.to_string();
+ }
+
+ output_block = output_block_mutable.to_block();
+ }
+
+protected:
+ HashJoinTestHelper _helper;
+};
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING}, {true, false},
+ {false, true}, build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 2);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+ check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+ check_column_values(*sorted_block.get_by_position(2).column, {3, 4});
+ check_column_values(*sorted_block.get_by_position(3).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyBuildSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
{}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING}, {true, false},
+ {false, true}, build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinEmptyProbeSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+ auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({},
{});
+
probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::INNER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING}, {true, false},
+ {false, true}, build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinOtherConjuncts) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 0, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({.join_op_type = TJoinOp::INNER_JOIN, .has_other_join_conjuncts =
true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 2);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 3});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "c"});
+ check_column_values(*sorted_block.get_by_position(2).column, {101, 102});
+ check_column_values(*sorted_block.get_by_position(3).column, {1, 3});
+ check_column_values(*sorted_block.get_by_position(4).column, {"a", "c"});
+ check_column_values(*sorted_block.get_by_position(5).column, {51, 59});
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinNullSafeEqual) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 0});
+
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({.join_op_type = TJoinOp::INNER_JOIN, .null_safe_equal = true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, true},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 3);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {3, 4, 5});
+ check_column_values(*sorted_block.get_by_position(1).column, {"c", "d",
Null()});
+ check_column_values(*sorted_block.get_by_position(2).column, {3, 4, 5});
+ check_column_values(*sorted_block.get_by_position(3).column, {"c", "d",
Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, CheckSlot) {
+ auto tnode = _helper.create_test_plan_node(TJoinOp::INNER_JOIN,
+ {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true},
false);
+
+ auto [probe_operator, sink_operator] = _helper.create_operators(tnode);
+ ASSERT_TRUE(probe_operator);
+ ASSERT_TRUE(sink_operator);
+
+ auto desc_tbl = _helper.runtime_state->desc_tbl();
+ desc_tbl._slot_desc_map[4]->_is_nullable =
!desc_tbl._slot_desc_map[4]->_is_nullable;
+ _helper.runtime_state->set_desc_tbl(&desc_tbl);
+
+ auto st = probe_operator->init(tnode, _helper.runtime_state.get());
+ ASSERT_TRUE(st.ok()) << "init failed: " << st.to_string();
+
+ st = probe_operator->prepare(_helper.runtime_state.get());
+ ASSERT_FALSE(st.ok());
+}
+
+TEST_F(HashJoinProbeOperatorTest, InnerJoinBroadcast) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({.join_op_type = TJoinOp::INNER_JOIN, .is_broadcast_join = true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 2);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+ check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+ check_column_values(*sorted_block.get_by_position(2).column, {3, 4});
+ check_column_values(*sorted_block.get_by_position(3).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, FullOuterJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(output_block.rows(), 8);
+
+ check_column_values(*sorted_block.get_by_position(0).column,
+ {1, 3, 4, Null(), Null(), Null(), Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column,
+ {"a", "c", "d", "b", "e", Null(), Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(2).column,
+ {Null(), 3, 4, Null(), Null(), 1, 2, 5});
+ check_column_values(*sorted_block.get_by_position(3).column,
+ {Null(), "c", "d", Null(), Null(), Null(), "b",
Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, FullOuterJoinEmptyBuildSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
{}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(output_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, FullOuterJoinEmptyProbeSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({},
{});
+
probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>({}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::FULL_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(output_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftOuterJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftOuterJoin2) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({1,
2, 3, 4, 5, 2, 3},
+ {0,
1, 0, 0, 1, 0, 0});
+ probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e", "b", "c"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 7);
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 3, 3,
4, Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column,
+ {"a", "b", "c", "c", "d", "b", "e"});
+ check_column_values(*sorted_block.get_by_position(2).column,
+ {Null(), 2, 3, 3, 4, Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(3).column,
+ {Null(), "b", "c", "c", "d", Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightOuterJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 5);
+
+ check_column_values(*sorted_block.get_by_position(0).column, {3, 4,
Null(), Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column,
+ {"c", "d", Null(), Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(2).column, {3, 4, 1, 2,
5});
+ check_column_values(*sorted_block.get_by_position(3).column, {"c", "d",
Null(), "b", Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightOuterJoinEmptyBuildSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
{}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightOuterJoin2) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block = ColumnHelper::create_nullable_block<DataTypeInt32>({1,
2, 3, 4, 5, 2, 3},
+ {0,
1, 0, 0, 1, 0, 0});
+ probe_block.insert(ColumnHelper::create_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e", "b", "c"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::RIGHT_OUTER_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 6);
+
+ check_column_values(*sorted_block.get_by_position(0).column, {2, 3, 3, 4,
Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column,
+ {"b", "c", "c", "d", Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(2).column, {2, 3, 3, 4,
1, 5});
+ check_column_values(*sorted_block.get_by_position(3).column,
+ {"b", "c", "c", "d", Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_ANTI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 3);
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, Null(),
Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "b",
"e"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftSemiJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 2);
+ check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+ check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinEmptyBuildSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
{}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_SEMI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightAntiJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::RIGHT_ANTI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 3);
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 5});
+ check_column_values(*sorted_block.get_by_position(1).column, {Null(), "b",
Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, RightSemiJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::RIGHT_SEMI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 2);
+
+ check_column_values(*sorted_block.get_by_position(0).column, {3, 4});
+ check_column_values(*sorted_block.get_by_position(1).column, {"c", "d"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"f", "g", "h", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinEmptyBuildSide) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>({},
{}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 5);
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinOtherConjuncts) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({.join_op_type = TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
.has_other_join_conjuncts = true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoin2) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 0}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 1);
+ check_column_values(*sorted_block.get_by_position(0).column, {2});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinOtherConjuncts2) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({.join_op_type = TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN,
.has_other_join_conjuncts = true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 0);
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoin2) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 0}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({2, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_ANTI_JOIN}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ auto sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ ASSERT_EQ(sorted_block.rows(), 3);
+ check_column_values(*sorted_block.get_by_position(0).column, {2, Null(),
Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "b",
"e"});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftAntiJoinMark) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN, true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 5);
+
+ Block sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4,
Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "c",
"d", "b", "e"});
+ check_column_values(*sorted_block.get_by_position(2).column, {Null(), 0,
0, Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, NullAwareLeftSemiJoinMark) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN, true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 5);
+
+ Block sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data() << std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4,
Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "c",
"d", "b", "e"});
+ check_column_values(*sorted_block.get_by_position(2).column, {Null(), 1,
1, Null(), Null()});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftSemiJoinMark) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_SEMI_JOIN, true, 1}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ ASSERT_EQ(output_block.rows(), 5);
+
+ Block sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) <<
std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4,
Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "c",
"d", "b", "e"});
+ check_column_values(*sorted_block.get_by_position(2).column, {0, 1, 1,
Null(), 0});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMark) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {1, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 1, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({TJoinOp::LEFT_ANTI_JOIN, true, 1}, {TPrimitiveType::INT,
TPrimitiveType::STRING},
+ {true, false}, {false, true}, build_blocks, probe_blocks,
output_block);
+
+ ASSERT_EQ(output_block.rows(), 5);
+
+ Block sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) <<
std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 3, 4,
Null(), Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "c",
"d", "b", "e"});
+ check_column_values(*sorted_block.get_by_position(2).column, {1, 0, 0,
Null(), 1});
+}
+
+TEST_F(HashJoinProbeOperatorTest, LeftAntiJoinMarkOtherConjuncts) {
+ auto sink_block = ColumnHelper::create_block<DataTypeInt32>({1, 2, 3, 4,
5});
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeString>(
+ {"a", "b", "c", "d", "e"}, {0, 0, 0, 0, 1}));
+
sink_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {51, 52, 59, 52, 200}, {0, 0, 0, 0, 1}));
+
+ auto probe_block =
+ ColumnHelper::create_nullable_block<DataTypeInt32>({1, 2, 3, 4,
5}, {0, 0, 0, 0, 1});
+ probe_block.insert(
+ ColumnHelper::create_column_with_name<DataTypeString>({"a", "b",
"c", "d", "e"}));
+
probe_block.insert(ColumnHelper::create_nullable_column_with_name<DataTypeInt32>(
+ {101, 100, 102, 99, 200}, {0, 0, 0, 0, 1}));
+
+ Block output_block;
+ std::vector<Block> build_blocks = {sink_block};
+ std::vector<Block> probe_blocks = {probe_block};
+ run_test({.join_op_type = TJoinOp::LEFT_ANTI_JOIN,
+ .is_mark_join = true,
+ .mark_join_conjuncts_size = 1,
+ .has_other_join_conjuncts = true},
+ {TPrimitiveType::INT, TPrimitiveType::STRING}, {true, false},
{false, true},
+ build_blocks, probe_blocks, output_block);
+
+ ASSERT_EQ(output_block.rows(), 5);
+
+ Block sorted_block = sort_block_by_columns(output_block);
+ std::cout << "Output block: " << sorted_block.dump_data(0, 100, true) <<
std::endl;
+
+ check_column_values(*sorted_block.get_by_position(0).column, {1, 2, 3, 4,
Null()});
+ check_column_values(*sorted_block.get_by_position(1).column, {"a", "b",
"c", "d", "e"});
+ check_column_values(*sorted_block.get_by_position(2).column, {101, 100,
102, 99, Null()});
+ check_column_values(*sorted_block.get_by_position(3).column, {0, 1, 0, 1,
1});
+}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/join_test_helper.cpp
b/be/test/pipeline/operator/join_test_helper.cpp
new file mode 100644
index 00000000000..d6828621590
--- /dev/null
+++ b/be/test/pipeline/operator/join_test_helper.cpp
@@ -0,0 +1,43 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "join_test_helper.h"
+
+#include "testutil/creators.h"
+
+namespace doris::pipeline {
+
+void JoinTestHelper::SetUp() {
+ runtime_state = std::make_unique<MockRuntimeState>();
+ obj_pool = std::make_unique<ObjectPool>();
+
+ runtime_profile = std::make_shared<RuntimeProfile>("test");
+
+ query_ctx = generate_one_query();
+
+ runtime_state->_query_ctx = query_ctx.get();
+ runtime_state->_query_id = query_ctx->query_id();
+ runtime_state->resize_op_id_to_local_state(-100);
+ runtime_state->set_max_operator_id(-100);
+
+ ADD_TIMER(runtime_profile.get(), "ExecTime");
+ runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "",
0);
+}
+
+void JoinTestHelper::TearDown() {}
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/pipeline/operator/join_test_helper.h
b/be/test/pipeline/operator/join_test_helper.h
new file mode 100644
index 00000000000..b94b99c8aa2
--- /dev/null
+++ b/be/test/pipeline/operator/join_test_helper.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gen_cpp/DataSinks_types.h>
+#include <gen_cpp/Descriptors_types.h>
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-function-mocker.h>
+#include <gmock/gmock-spec-builders.h>
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
+#include <memory>
+
+#include "common/object_pool.h"
+#include "pipeline/pipeline_task.h"
+#include "runtime/fragment_mgr.h"
+#include "testutil/mock/mock_runtime_state.h"
+#include "util/runtime_profile.h"
+#include "vec/spill/spill_stream_manager.h"
+
+namespace doris::pipeline {
+class JoinTestHelper {
+public:
+ virtual ~JoinTestHelper() = default;
+ void SetUp();
+ void TearDown();
+
+ // virtual TPlanNode create_test_plan_node() = 0;
+ // virtual TDescriptorTable create_test_table_descriptor(bool nullable) =
0;
+
+ std::unique_ptr<MockRuntimeState> runtime_state;
+ std::unique_ptr<ObjectPool> obj_pool;
+ std::shared_ptr<QueryContext> query_ctx;
+ std::shared_ptr<RuntimeProfile> runtime_profile;
+ std::shared_ptr<PipelineTask> pipeline_task;
+ DescriptorTbl* desc_tbl;
+};
+
+} // namespace doris::pipeline
\ No newline at end of file
diff --git a/be/test/testutil/mock/mock_operators.h
b/be/test/testutil/mock/mock_operators.h
index 58b1a738926..0c6136880ad 100644
--- a/be/test/testutil/mock/mock_operators.h
+++ b/be/test/testutil/mock/mock_operators.h
@@ -49,6 +49,13 @@ private:
bool _eos = false;
};
+class MockSourceOperator : public MockChildOperator {
+public:
+ bool is_source() const override { return true; }
+
+ Status close(RuntimeState* state) override { return Status::OK(); }
+};
+
class MockSinkOperator final : public DataSinkOperatorXBase {
public:
Status sink(RuntimeState* state, vectorized::Block* block, bool eos)
override {
diff --git a/be/test/testutil/mock/mock_runtime_state.h
b/be/test/testutil/mock/mock_runtime_state.h
index 2b3888bf518..0831ba80404 100644
--- a/be/test/testutil/mock/mock_runtime_state.h
+++ b/be/test/testutil/mock/mock_runtime_state.h
@@ -41,12 +41,17 @@ public:
return _enable_shared_exchange_sink_buffer;
}
+ bool enable_share_hash_table_for_broadcast_join() const override {
+ return _enable_share_hash_table_for_broadcast_join;
+ }
+
bool enable_local_exchange() const override { return true; }
WorkloadGroupPtr workload_group() override { return _workload_group; }
// default batch size
int batsh_size = 4096;
bool _enable_shared_exchange_sink_buffer = true;
+ bool _enable_share_hash_table_for_broadcast_join = true;
std::shared_ptr<MockContext> _mock_context =
std::make_shared<MockContext>();
std::shared_ptr<MockQueryContext> _query_ctx_uptr =
std::make_shared<MockQueryContext>();
WorkloadGroupPtr _workload_group = nullptr;
diff --git a/be/test/testutil/run_all_tests.cpp
b/be/test/testutil/run_all_tests.cpp
index ac43c7ace14..a9688a3656d 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -20,6 +20,7 @@
#include "common/config.h"
#include "common/logging.h"
+#include "common/phdr_cache.h"
#include "common/status.h"
#include "gtest/gtest.h"
#include "olap/page_cache.h"
@@ -97,8 +98,23 @@ int main(int argc, char** argv) {
doris::ExecEnv::GetInstance()->set_tracking_memory(false);
google::ParseCommandLineFlags(&argc, &argv, false);
- int res = RUN_ALL_TESTS();
- doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr);
- return res;
+ updatePHDRCache();
+ try {
+ int res = RUN_ALL_TESTS();
+
doris::ExecEnv::GetInstance()->set_non_block_close_thread_pool(nullptr);
+ return res;
+ } catch (doris::Exception& e) {
+ LOG(FATAL) << "Exception: " << e.what();
+ } catch (...) {
+ auto eptr = std::current_exception();
+ try {
+ std::rethrow_exception(eptr);
+ } catch (const std::exception& e) {
+ LOG(FATAL) << "Unknown exception: " << e.what();
+ } catch (...) {
+ LOG(FATAL) << "Unknown exception";
+ }
+ return -1;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]