This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch new_join
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/new_join by this push:
new 2b8e9340cdd Join rewrite (#26140)
2b8e9340cdd is described below
commit 2b8e9340cdd5c1daecde8f5b1670c7d97dbce146
Author: HappenLee <[email protected]>
AuthorDate: Tue Oct 31 11:03:35 2023 +0800
Join rewrite (#26140)
Co-authored-by: BiteTheDDDDt <[email protected]>
---
be/src/exprs/runtime_filter_slots.h | 36 ++-
be/src/pipeline/exec/hashjoin_build_sink.cpp | 146 +++------
be/src/pipeline/exec/hashjoin_build_sink.h | 7 +-
be/src/pipeline/exec/hashjoin_probe_operator.h | 4 +-
be/src/pipeline/exec/set_sink_operator.cpp | 13 +-
be/src/pipeline/exec/set_sink_operator.h | 2 +-
be/src/pipeline/exec/set_source_operator.cpp | 6 +-
be/src/pipeline/pipeline_x/dependency.h | 7 +-
be/src/vec/columns/column.h | 9 +-
be/src/vec/columns/column_array.cpp | 11 +
be/src/vec/columns/column_array.h | 3 +
be/src/vec/columns/column_complex.h | 15 +
be/src/vec/columns/column_const.h | 5 +
be/src/vec/columns/column_decimal.h | 12 +
be/src/vec/columns/column_dictionary.h | 5 +
be/src/vec/columns/column_fixed_length_object.h | 22 ++
be/src/vec/columns/column_map.cpp | 11 +
be/src/vec/columns/column_map.h | 3 +
be/src/vec/columns/column_nullable.cpp | 10 +
be/src/vec/columns/column_nullable.h | 3 +
be/src/vec/columns/column_object.cpp | 11 +
be/src/vec/columns/column_object.h | 3 +
be/src/vec/columns/column_string.cpp | 37 +++
be/src/vec/columns/column_string.h | 3 +
be/src/vec/columns/column_struct.cpp | 9 +
be/src/vec/columns/column_struct.h | 3 +
be/src/vec/columns/column_vector.cpp | 23 ++
be/src/vec/columns/column_vector.h | 2 +
be/src/vec/columns/predicate_column.h | 5 +
be/src/vec/common/hash_table/hash_map.h | 214 ++++++++++++
be/src/vec/common/hash_table/hash_map_context.h | 125 ++++++--
be/src/vec/common/hash_table/hash_table.h | 1 -
.../vec/common/hash_table/hash_table_set_build.h | 9 +-
be/src/vec/exec/join/join_op.h | 72 ++---
be/src/vec/exec/join/process_hash_table_probe.h | 9 +-
.../vec/exec/join/process_hash_table_probe_impl.h | 357 +++------------------
be/src/vec/exec/join/vhash_join_node.cpp | 115 +++----
be/src/vec/exec/join/vhash_join_node.h | 156 ++-------
be/src/vec/exec/vset_operation_node.cpp | 68 ++--
be/src/vec/exec/vset_operation_node.h | 5 +-
be/src/vec/runtime/shared_hash_table_controller.h | 11 +-
41 files changed, 787 insertions(+), 781 deletions(-)
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index 6c96b160551..0f841e5a60f 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -162,7 +162,7 @@ public:
return Status::OK();
}
- void insert(std::unordered_map<const vectorized::Block*,
std::vector<int>>& datas) {
+ void insert(const std::unordered_set<const vectorized::Block*>& datas) {
for (int i = 0; i < _build_expr_context.size(); ++i) {
auto iter = _runtime_filters.find(i);
if (iter == _runtime_filters.end()) {
@@ -170,30 +170,32 @@ public:
}
int result_column_id =
_build_expr_context[i]->get_last_result_column_id();
- for (auto it : datas) {
- auto& column =
it.first->get_by_position(result_column_id).column;
+ for (const auto* it : datas) {
+ auto column = it->get_by_position(result_column_id).column;
- if (auto* nullable =
+ std::vector<int> indexs;
+ // indexs start from 1 because the first row is mocked for
join hash map
+ if (const auto* nullable =
vectorized::check_and_get_column<vectorized::ColumnNullable>(*column)) {
- auto& column_nested = nullable->get_nested_column_ptr();
- auto& column_nullmap = nullable->get_null_map_column_ptr();
- std::vector<int> indexs;
- for (int row_num : it.second) {
- if (assert_cast<const
vectorized::ColumnUInt8*>(column_nullmap.get())
- ->get_bool(row_num)) {
+ column = nullable->get_nested_column_ptr();
+ const uint8_t* null_map = assert_cast<const
vectorized::ColumnUInt8*>(
+
nullable->get_null_map_column_ptr().get())
+ ->get_data()
+ .data();
+ for (int i = 1; i < column->size(); i++) {
+ if (null_map[i]) {
continue;
}
- indexs.push_back(row_num);
+ indexs.push_back(i);
}
- for (auto filter : iter->second) {
- filter->insert_batch(column_nested, indexs);
- }
-
} else {
- for (auto filter : iter->second) {
- filter->insert_batch(column, it.second);
+ for (int i = 1; i < column->size(); i++) {
+ indexs.push_back(i);
}
}
+ for (auto* filter : iter->second) {
+ filter->insert_batch(column, indexs);
+ }
}
}
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp
b/be/src/pipeline/exec/hashjoin_build_sink.cpp
index 1af98cd77aa..f18cab51c02 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.cpp
+++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp
@@ -39,10 +39,7 @@ Overload(Callables&&... callables) -> Overload<Callables...>;
HashJoinBuildSinkLocalState::HashJoinBuildSinkLocalState(DataSinkOperatorXBase*
parent,
RuntimeState* state)
- : JoinBuildSinkLocalState(parent, state),
- _build_block_idx(0),
- _build_side_mem_used(0),
- _build_side_last_mem_used(0) {}
+ : JoinBuildSinkLocalState(parent, state) {}
Status HashJoinBuildSinkLocalState::init(RuntimeState* state,
LocalSinkStateInfo& info) {
RETURN_IF_ERROR(JoinBuildSinkLocalState::init(state, info));
@@ -53,13 +50,7 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
_shared_state->join_op_variants = p._join_op_variants;
if (p._is_broadcast_join &&
state->enable_share_hash_table_for_broadcast_join()) {
- _shared_state->build_blocks = p._shared_hash_table_context->blocks;
- } else {
- _shared_state->build_blocks.reset(new
std::vector<vectorized::Block>());
- // avoid vector expand change block address.
- // one block can store 4g data, _build_blocks can store 128*4g data.
- // if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
-
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
+ _shared_state->build_block = p._shared_hash_table_context->block;
}
_shared_state->is_null_safe_eq_join = p._is_null_safe_eq_join;
_shared_state->store_null_in_hash_table = p._store_null_in_hash_table;
@@ -84,11 +75,6 @@ Status HashJoinBuildSinkLocalState::init(RuntimeState*
state, LocalSinkStateInfo
_shared_hash_table_dependency->block_writing();
p._shared_hashtable_controller->append_dependency(p.node_id(),
_shared_hash_table_dependency);
- } else if (p._is_broadcast_join) {
- // avoid vector expand change block address.
- // one block can store 4g data, _build_blocks can store 128*4g data.
- // if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
-
_shared_state->build_blocks->reserve(vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
}
_memory_usage_counter = ADD_LABEL_COUNTER(profile(), "MemoryUsage");
@@ -158,25 +144,24 @@ void
HashJoinBuildSinkLocalState::init_short_circuit_for_probe() {
_shared_state->short_circuit_for_probe =
(_shared_state->_has_null_in_build_side &&
p._join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!p._is_mark_join) ||
- (_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::INNER_JOIN &&
+ (!_shared_state->build_block && p._join_op == TJoinOp::INNER_JOIN
&&
!p._is_mark_join) ||
- (_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::LEFT_SEMI_JOIN &&
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::LEFT_SEMI_JOIN &&
!p._is_mark_join) ||
- (_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::RIGHT_OUTER_JOIN) ||
- (_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::RIGHT_SEMI_JOIN) ||
- (_shared_state->build_blocks->empty() && p._join_op ==
TJoinOp::RIGHT_ANTI_JOIN);
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::RIGHT_OUTER_JOIN) ||
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::RIGHT_SEMI_JOIN) ||
+ (!_shared_state->build_block && p._join_op ==
TJoinOp::RIGHT_ANTI_JOIN);
//when build table rows is 0 and not have other_join_conjunct and not
_is_mark_join and join type is one of
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_shared_state->empty_right_table_need_probe_dispose =
- (_shared_state->build_blocks->empty() &&
!p._have_other_join_conjunct &&
- !p._is_mark_join) &&
+ (!_shared_state->build_block && !p._have_other_join_conjunct &&
!p._is_mark_join) &&
(p._join_op == TJoinOp::LEFT_OUTER_JOIN || p._join_op ==
TJoinOp::FULL_OUTER_JOIN ||
p._join_op == TJoinOp::LEFT_ANTI_JOIN);
}
Status HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
- vectorized::Block&
block, uint8_t offset) {
+ vectorized::Block&
block) {
auto& p = _parent->cast<HashJoinBuildSinkOperatorX>();
SCOPED_TIMER(_build_table_timer);
size_t rows = block.rows();
@@ -209,29 +194,30 @@ Status
HashJoinBuildSinkLocalState::process_build_block(RuntimeState* state,
Status st = _dependency->extract_join_column<true>(block, null_map_val,
raw_ptrs, res_col_ids);
st = std::visit(
- Overload {
- [&](std::monostate& arg, auto has_null_value,
- auto short_circuit_for_null_in_build_side) -> Status {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- return Status::OK();
- },
- [&](auto&& arg, auto has_null_value,
- auto short_circuit_for_null_in_build_side) -> Status {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- vectorized::ProcessHashTableBuild<HashTableCtxType,
-
HashJoinBuildSinkLocalState>
- hash_table_build_process(rows, block,
raw_ptrs, this,
- state->batch_size(),
offset, state);
- return hash_table_build_process
- .template run<has_null_value,
short_circuit_for_null_in_build_side>(
- arg,
- has_null_value ||
short_circuit_for_null_in_build_side
- ? &null_map_val->get_data()
- : nullptr,
-
&_shared_state->_has_null_in_build_side);
- }},
- *_shared_state->hash_table_variants,
+ Overload {[&](std::monostate& arg, auto join_op, auto
has_null_value,
+ auto short_circuit_for_null_in_build_side) -> Status
{
+ LOG(FATAL) << "FATAL: uninited hash table";
+ __builtin_unreachable();
+ return Status::OK();
+ },
+ [&](auto&& arg, auto&& join_op, auto has_null_value,
+ auto short_circuit_for_null_in_build_side) -> Status
{
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ using JoinOpType = std::decay_t<decltype(join_op)>;
+ vectorized::ProcessHashTableBuild<HashTableCtxType,
+
HashJoinBuildSinkLocalState>
+ hash_table_build_process(rows, block,
raw_ptrs, this,
+
state->batch_size(), state);
+ return hash_table_build_process
+ .template run<JoinOpType::value,
has_null_value,
+
short_circuit_for_null_in_build_side>(
+ arg,
+ has_null_value ||
short_circuit_for_null_in_build_side
+ ? &null_map_val->get_data()
+ : nullptr,
+
&_shared_state->_has_null_in_build_side);
+ }},
+ *_shared_state->hash_table_variants,
_shared_state->join_op_variants,
vectorized::make_bool_variant(_build_side_ignore_null),
vectorized::make_bool_variant(p._short_circuit_for_null_in_build_side));
@@ -323,7 +309,7 @@ void
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
}
return;
}
- if (!try_get_hash_map_context_fixed<PartitionedHashMap,
HashCRC32, RowRefListType>(
+ if (!try_get_hash_map_context_fixed<JoinFixedHashMap,
HashCRC32, RowRefListType>(
*_shared_state->hash_table_variants,
_build_expr_ctxs)) {
_shared_state->hash_table_variants
->emplace<vectorized::SerializedHashTableContext<RowRefListType>>();
@@ -333,16 +319,6 @@ void
HashJoinBuildSinkLocalState::_hash_table_init(RuntimeState* state) {
vectorized::make_bool_variant(p._have_other_join_conjunct));
DCHECK(!std::holds_alternative<std::monostate>(*_shared_state->hash_table_variants));
-
- std::visit(vectorized::Overload {[&](std::monostate& arg) {
- LOG(FATAL) << "FATAL: uninited hash
table";
- __builtin_unreachable();
- },
- [&](auto&& arg) {
-
arg.hash_table->set_partitioned_threshold(
-
state->partitioned_hash_join_rows_threshold());
- }},
- *_shared_state->hash_table_variants);
}
HashJoinBuildSinkOperatorX::HashJoinBuildSinkOperatorX(ObjectPool* pool, int
operator_id,
@@ -405,9 +381,6 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
SCOPED_TIMER(local_state.profile()->total_time_counter());
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
- // make one block for each 4 gigabytes
- constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
if (local_state._shared_state->_has_null_in_build_side) {
// TODO: if _has_null_in_build_side is true we should finish current
pipeline task.
DCHECK(state->enable_pipeline_exec());
@@ -420,52 +393,29 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
if (in_block->rows() != 0) {
SCOPED_TIMER(local_state._build_side_merge_block_timer);
+ if (local_state._build_side_mutable_block.empty()) {
+ RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(
+ *(in_block->create_same_struct_block(1, false))));
+ }
RETURN_IF_ERROR(local_state._build_side_mutable_block.merge(*in_block));
- }
-
- if (UNLIKELY(local_state._build_side_mem_used -
local_state._build_side_last_mem_used >
- BUILD_BLOCK_MAX_SIZE)) {
- if (local_state._shared_state->build_blocks->size() ==
- vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
- return Status::NotSupported(strings::Substitute(
- "data size of right table in hash join > $0",
- BUILD_BLOCK_MAX_SIZE *
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+ if (local_state._build_side_mutable_block.rows() >
+ std::numeric_limits<uint32_t>::max()) {
+ return Status::NotSupported(
+ "Hash join do not support build table rows"
+ " over:" +
+ std::to_string(std::numeric_limits<uint32_t>::max()));
}
- local_state._shared_state->build_blocks->emplace_back(
- local_state._build_side_mutable_block.to_block());
-
- COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
- .bytes());
-
- // TODO:: Rethink may we should do the process after we receive
all build blocks ?
- // which is better.
- RETURN_IF_ERROR(local_state.process_build_block(
- state,
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
- local_state._build_block_idx));
-
- local_state._build_side_mutable_block = vectorized::MutableBlock();
- ++local_state._build_block_idx;
- local_state._build_side_last_mem_used =
local_state._build_side_mem_used;
}
}
if (local_state._should_build_hash_table && source_state ==
SourceState::FINISHED) {
if (!local_state._build_side_mutable_block.empty()) {
- if (local_state._shared_state->build_blocks->size() ==
- vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
- return Status::NotSupported(strings::Substitute(
- "data size of right table in hash join > $0",
- BUILD_BLOCK_MAX_SIZE *
vectorized::HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
- }
- local_state._shared_state->build_blocks->emplace_back(
+ local_state._shared_state->build_block =
std::make_shared<vectorized::Block>(
local_state._build_side_mutable_block.to_block());
COUNTER_UPDATE(local_state._build_blocks_memory_usage,
-
(*local_state._shared_state->build_blocks)[local_state._build_block_idx]
- .bytes());
+ (*local_state._shared_state->build_block).bytes());
RETURN_IF_ERROR(local_state.process_build_block(
- state,
(*local_state._shared_state->build_blocks)[local_state._build_block_idx],
- local_state._build_block_idx));
+ state, (*local_state._shared_state->build_block)));
}
auto ret = std::visit(
Overload {[&](std::monostate&) -> Status {
@@ -556,7 +506,7 @@ Status HashJoinBuildSinkOperatorX::sink(RuntimeState*
state, vectorized::Block*
if (source_state == SourceState::FINISHED) {
// Since the comparison of null values is meaningless, null aware left
anti join should not output null
// when the build side is not empty.
- if (!local_state._shared_state->build_blocks->empty() &&
+ if (!local_state._shared_state->build_block &&
_join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
local_state._shared_state->probe_ignore_null = true;
}
diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h
b/be/src/pipeline/exec/hashjoin_build_sink.h
index 8ba6e2fba3b..34054a921dd 100644
--- a/be/src/pipeline/exec/hashjoin_build_sink.h
+++ b/be/src/pipeline/exec/hashjoin_build_sink.h
@@ -61,11 +61,11 @@ public:
ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
using Parent = HashJoinBuildSinkOperatorX;
HashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState*
state);
- ~HashJoinBuildSinkLocalState() = default;
+ ~HashJoinBuildSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
- Status process_build_block(RuntimeState* state, vectorized::Block& block,
uint8_t offset);
+ Status process_build_block(RuntimeState* state, vectorized::Block& block);
void init_short_circuit_for_probe();
HashJoinBuildSinkOperatorX* join_build() { return
(HashJoinBuildSinkOperatorX*)_parent; }
@@ -95,7 +95,6 @@ protected:
std::vector<IRuntimeFilter*> _runtime_filters;
bool _should_build_hash_table = true;
- uint8_t _build_block_idx = 0;
int64_t _build_side_mem_used = 0;
int64_t _build_side_last_mem_used = 0;
vectorized::MutableBlock _build_side_mutable_block;
@@ -103,7 +102,7 @@ protected:
bool _has_set_need_null_map_for_build = false;
bool _build_side_ignore_null = false;
size_t _build_rf_cardinality = 0;
- std::unordered_map<const vectorized::Block*, std::vector<int>>
_inserted_rows;
+ std::unordered_set<const vectorized::Block*> _inserted_blocks;
std::shared_ptr<SharedHashTableDependency> _shared_hash_table_dependency;
RuntimeProfile::Counter* _build_table_timer;
diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h
b/be/src/pipeline/exec/hashjoin_probe_operator.h
index 923f7dd7b94..181934e7b50 100644
--- a/be/src/pipeline/exec/hashjoin_probe_operator.h
+++ b/be/src/pipeline/exec/hashjoin_probe_operator.h
@@ -89,9 +89,7 @@ public:
vectorized::DataTypes right_table_data_types();
vectorized::DataTypes left_table_data_types();
bool* has_null_in_build_side() { return
&_shared_state->_has_null_in_build_side; }
- std::shared_ptr<std::vector<vectorized::Block>> build_blocks() const {
- return _shared_state->build_blocks;
- }
+ std::shared_ptr<vectorized::Block> build_block() const { return
_shared_state->build_block; }
private:
void _prepare_probe_block();
diff --git a/be/src/pipeline/exec/set_sink_operator.cpp
b/be/src/pipeline/exec/set_sink_operator.cpp
index 07205e6c8a2..a7d0cd3c0ed 100644
--- a/be/src/pipeline/exec/set_sink_operator.cpp
+++ b/be/src/pipeline/exec/set_sink_operator.cpp
@@ -60,8 +60,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
COUNTER_UPDATE(local_state.rows_input_counter(),
(int64_t)in_block->rows());
auto& mem_used = local_state._shared_state->mem_used;
- auto& build_blocks = local_state._shared_state->build_blocks;
- auto& build_block_index = local_state._shared_state->build_block_index;
+ auto& build_block = local_state._shared_state->build_block;
auto& valid_element_in_hash_tbl =
local_state._shared_state->valid_element_in_hash_tbl;
if (in_block->rows() != 0) {
@@ -71,11 +70,9 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
if (source_state == SourceState::FINISHED ||
local_state._mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
- build_blocks.emplace_back(local_state._mutable_block.to_block());
- RETURN_IF_ERROR(_process_build_block(local_state,
build_blocks[build_block_index],
- build_block_index, state));
+ build_block = local_state._mutable_block.to_block();
+ RETURN_IF_ERROR(_process_build_block(local_state, build_block, state));
local_state._mutable_block.clear();
- ++build_block_index;
if (source_state == SourceState::FINISHED) {
if constexpr (is_intersect) {
@@ -101,7 +98,7 @@ Status SetSinkOperatorX<is_intersect>::sink(RuntimeState*
state, vectorized::Blo
template <bool is_intersect>
Status SetSinkOperatorX<is_intersect>::_process_build_block(
- SetSinkLocalState<is_intersect>& local_state, vectorized::Block&
block, uint8_t offset,
+ SetSinkLocalState<is_intersect>& local_state, vectorized::Block& block,
RuntimeState* state) {
size_t rows = block.rows();
if (rows == 0) {
@@ -117,7 +114,7 @@ Status SetSinkOperatorX<is_intersect>::_process_build_block(
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
vectorized::HashTableBuild<HashTableCtxType, is_intersect>
- hash_table_build_process(&local_state, rows,
raw_ptrs, offset, state);
+ hash_table_build_process(&local_state, rows,
raw_ptrs, state);
static_cast<void>(hash_table_build_process(arg,
local_state._arena));
} else {
LOG(FATAL) << "FATAL: uninited hash table";
diff --git a/be/src/pipeline/exec/set_sink_operator.h
b/be/src/pipeline/exec/set_sink_operator.h
index a1b9d8b7079..c3bf21c3cb9 100644
--- a/be/src/pipeline/exec/set_sink_operator.h
+++ b/be/src/pipeline/exec/set_sink_operator.h
@@ -120,7 +120,7 @@ private:
friend struct HashTableBuild;
Status _process_build_block(SetSinkLocalState<is_intersect>& local_state,
- vectorized::Block& block, uint8_t offset,
RuntimeState* state);
+ vectorized::Block& block, RuntimeState* state);
Status _extract_build_column(SetSinkLocalState<is_intersect>& local_state,
vectorized::Block& block,
vectorized::ColumnRawPtrs& raw_ptrs);
diff --git a/be/src/pipeline/exec/set_source_operator.cpp
b/be/src/pipeline/exec/set_source_operator.cpp
index e679274d92c..149721632ff 100644
--- a/be/src/pipeline/exec/set_source_operator.cpp
+++ b/be/src/pipeline/exec/set_source_operator.cpp
@@ -180,12 +180,12 @@ void
SetSourceOperatorX<is_intersect>::_add_result_columns(
SetSourceLocalState<is_intersect>& local_state,
vectorized::RowRefListWithFlags& value,
int& block_size) {
auto& build_col_idx = local_state._shared_state->build_col_idx;
- auto& build_blocks = local_state._shared_state->build_blocks;
+ auto& build_block = local_state._shared_state->build_block;
auto it = value.begin();
for (auto idx = build_col_idx.begin(); idx != build_col_idx.end(); ++idx) {
- auto& column =
*build_blocks[it->block_offset].get_by_position(idx->first).column;
- if (local_state._mutable_cols[idx->second]->is_nullable() xor
column.is_nullable()) {
+ auto& column = *build_block.get_by_position(idx->first).column;
+ if (local_state._mutable_cols[idx->second]->is_nullable() ^
column.is_nullable()) {
DCHECK(local_state._mutable_cols[idx->second]->is_nullable());
((vectorized::ColumnNullable*)(local_state._mutable_cols[idx->second].get()))
->insert_from_not_nullable(column, it->row_num);
diff --git a/be/src/pipeline/pipeline_x/dependency.h
b/be/src/pipeline/pipeline_x/dependency.h
index 131198c4496..290df96cbad 100644
--- a/be/src/pipeline/pipeline_x/dependency.h
+++ b/be/src/pipeline/pipeline_x/dependency.h
@@ -620,7 +620,7 @@ struct HashJoinSharedState : public JoinSharedState {
std::make_shared<vectorized::HashTableVariants>();
const std::vector<TupleDescriptor*> build_side_child_desc;
size_t build_exprs_size = 0;
- std::shared_ptr<std::vector<vectorized::Block>> build_blocks = nullptr;
+ std::shared_ptr<vectorized::Block> build_block;
bool probe_ignore_null = false;
};
@@ -713,8 +713,7 @@ struct SetSharedState {
/// default init
//record memory during running
int64_t mem_used = 0;
- std::vector<vectorized::Block> build_blocks; // build to source
- int build_block_index = 0; // build to source
+ vectorized::Block build_block; // build to source
//record element size in hashtable
int64_t valid_element_in_hash_tbl = 0;
//first:column_id, could point to origin column or cast column
@@ -785,7 +784,7 @@ public:
return;
}
- if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32,
+ if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32,
vectorized::RowRefListWithFlags>(
*hash_table_variants, child_exprs_lists[0])) {
hash_table_variants->emplace<
diff --git a/be/src/vec/columns/column.h b/be/src/vec/columns/column.h
index cfb9163820f..ef9a28be1a8 100644
--- a/be/src/vec/columns/column.h
+++ b/be/src/vec/columns/column.h
@@ -238,10 +238,17 @@ public:
/// Appends a batch elements from other column with the same type
/// indices_begin + indices_end represent the row indices of column src
/// Warning:
- /// if *indices == -1 means the row is null, only use in outer join,
do not use in any other place
+ /// if *indices == -1 means the row is null
virtual void insert_indices_from(const IColumn& src, const int*
indices_begin,
const int* indices_end) = 0;
+ /// Appends a batch elements from other column with the same type
+ /// indices_begin + indices_end represent the row indices of column src
+ /// Warning:
+ /// if *indices == 0 means the row is null, only use in outer join,
do not use in any other place
+ virtual void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) = 0;
+
/// Appends data located in specified memory chunk if it is possible
(throws an exception if it cannot be implemented).
/// Is used to optimize some computations (in aggregation, for example).
/// Parameter length could be ignored if column values have fixed size.
diff --git a/be/src/vec/columns/column_array.cpp
b/be/src/vec/columns/column_array.cpp
index ca3b6c12da5..2916ac83108 100644
--- a/be/src/vec/columns/column_array.cpp
+++ b/be/src/vec/columns/column_array.cpp
@@ -804,6 +804,17 @@ void ColumnArray::insert_indices_from(const IColumn& src,
const int* indices_beg
}
}
+void ColumnArray::insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ for (auto x = indices_begin; x != indices_end; ++x) {
+ if (*x == 0) {
+ ColumnArray::insert_default();
+ } else {
+ ColumnArray::insert_from(src, *x);
+ }
+ }
+}
+
ColumnPtr ColumnArray::replicate(const IColumn::Offsets& replicate_offsets)
const {
if (replicate_offsets.empty()) return clone_empty();
diff --git a/be/src/vec/columns/column_array.h
b/be/src/vec/columns/column_array.h
index c37fb48ba52..46541c3bbb0 100644
--- a/be/src/vec/columns/column_array.h
+++ b/be/src/vec/columns/column_array.h
@@ -224,6 +224,9 @@ public:
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
+
void replace_column_data(const IColumn& rhs, size_t row, size_t self_row =
0) override {
DCHECK(size() > self_row);
const auto& r = assert_cast<const ColumnArray&>(rhs);
diff --git a/be/src/vec/columns/column_complex.h
b/be/src/vec/columns/column_complex.h
index b25ae9f1577..b343a8f9cdb 100644
--- a/be/src/vec/columns/column_complex.h
+++ b/be/src/vec/columns/column_complex.h
@@ -199,6 +199,21 @@ public:
}
}
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override {
+ const Self& src_vec = assert_cast<const Self&>(src);
+ auto new_size = indices_end - indices_begin;
+
+ for (uint32_t i = 0; i < new_size; ++i) {
+ auto offset = *(indices_begin + i);
+ if (offset == 0) {
+ data.emplace_back(T {});
+ } else {
+ data.emplace_back(src_vec.get_element(offset));
+ }
+ }
+ }
+
void pop_back(size_t n) override { data.erase(data.end() - n, data.end());
}
// it's impossible to use ComplexType as key , so we don't have to
implement them
[[noreturn]] StringRef serialize_value_into_arena(size_t n, Arena& arena,
diff --git a/be/src/vec/columns/column_const.h
b/be/src/vec/columns/column_const.h
index 307066a7ae9..280d2de8344 100644
--- a/be/src/vec/columns/column_const.h
+++ b/be/src/vec/columns/column_const.h
@@ -116,6 +116,11 @@ public:
s += (indices_end - indices_begin);
}
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override {
+ s += (indices_end - indices_begin);
+ }
+
void insert(const Field&) override { ++s; }
void insert_data(const char*, size_t) override { ++s; }
diff --git a/be/src/vec/columns/column_decimal.h
b/be/src/vec/columns/column_decimal.h
index 6c1e8893a3d..e72e23bdcc7 100644
--- a/be/src/vec/columns/column_decimal.h
+++ b/be/src/vec/columns/column_decimal.h
@@ -132,6 +132,18 @@ public:
}
}
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override {
+ auto origin_size = size();
+ auto new_size = indices_end - indices_begin;
+ data.resize(origin_size + new_size);
+ const T* __restrict src_data = reinterpret_cast<const
T*>(src.get_raw_data().data);
+
+ for (uint32_t i = 0; i < new_size; ++i) {
+ data[origin_size + i] = src_data[indices_begin[i]];
+ }
+ }
+
void insert_many_fix_len_data(const char* data_ptr, size_t num) override;
void insert_many_raw_data(const char* pos, size_t num) override {
diff --git a/be/src/vec/columns/column_dictionary.h
b/be/src/vec/columns/column_dictionary.h
index e00e5b425f8..1f107e629f4 100644
--- a/be/src/vec/columns/column_dictionary.h
+++ b/be/src/vec/columns/column_dictionary.h
@@ -82,6 +82,11 @@ public:
LOG(FATAL) << "insert_indices_from not supported in ColumnDictionary";
}
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override {
+ LOG(FATAL) << "insert_indices_from_join not supported in
ColumnDictionary";
+ }
+
void pop_back(size_t n) override { LOG(FATAL) << "pop_back not supported
in ColumnDictionary"; }
void update_hash_with_value(size_t n, SipHash& hash) const override {
diff --git a/be/src/vec/columns/column_fixed_length_object.h
b/be/src/vec/columns/column_fixed_length_object.h
index dce6666f132..8a9075c3a94 100644
--- a/be/src/vec/columns/column_fixed_length_object.h
+++ b/be/src/vec/columns/column_fixed_length_object.h
@@ -105,6 +105,28 @@ public:
}
}
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override {
+ const Self& src_vec = assert_cast<const Self&>(src);
+ auto origin_size = size();
+ auto new_size = indices_end - indices_begin;
+ if (_item_size == 0) {
+ _item_size = src_vec._item_size;
+ }
+ DCHECK(_item_size == src_vec._item_size) << "dst and src should have
the same _item_size";
+ resize(origin_size + new_size);
+
+ for (uint32_t i = 0; i < new_size; ++i) {
+ auto offset = indices_begin[i];
+ if (offset) {
+ memcpy(&_data[(origin_size + i) * _item_size],
&src_vec._data[offset * _item_size],
+ _item_size);
+ } else {
+ memset(&_data[(origin_size + i) * _item_size], 0, _item_size);
+ }
+ }
+ }
+
void clear() override {
_data.clear();
_item_count = 0;
diff --git a/be/src/vec/columns/column_map.cpp
b/be/src/vec/columns/column_map.cpp
index 47a41c3dcfe..60ce83edc10 100644
--- a/be/src/vec/columns/column_map.cpp
+++ b/be/src/vec/columns/column_map.cpp
@@ -196,6 +196,17 @@ void ColumnMap::insert_indices_from(const IColumn& src,
const int* indices_begin
}
}
+void ColumnMap::insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) {
+ for (auto x = indices_begin; x != indices_end; ++x) {
+ if (*x == 0) {
+ ColumnMap::insert_default();
+ } else {
+ ColumnMap::insert_from(src, *x);
+ }
+ }
+}
+
StringRef ColumnMap::serialize_value_into_arena(size_t n, Arena& arena, char
const*& begin) const {
size_t array_size = size_at(n);
size_t offset = offset_at(n);
diff --git a/be/src/vec/columns/column_map.h b/be/src/vec/columns/column_map.h
index 551b6f9bfd6..b99cb90ae9c 100644
--- a/be/src/vec/columns/column_map.h
+++ b/be/src/vec/columns/column_map.h
@@ -133,6 +133,9 @@ public:
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
+
void append_data_by_selector(MutableColumnPtr& res,
const IColumn::Selector& selector) const
override {
return append_data_by_selector_impl<ColumnMap>(res, selector);
diff --git a/be/src/vec/columns/column_nullable.cpp
b/be/src/vec/columns/column_nullable.cpp
index 42b88ac7ae9..c3558b36d93 100644
--- a/be/src/vec/columns/column_nullable.cpp
+++ b/be/src/vec/columns/column_nullable.cpp
@@ -308,6 +308,16 @@ void ColumnNullable::insert_indices_from(const IColumn&
src, const int* indices_
_need_update_has_null = true;
}
+void ColumnNullable::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
+ const uint32_t* indices_end) {
+ const auto& src_concrete = assert_cast<const ColumnNullable&>(src);
+
get_nested_column().insert_indices_from_join(src_concrete.get_nested_column(),
indices_begin,
+ indices_end);
+
_get_null_map_column().insert_indices_from_join(src_concrete.get_null_map_column(),
+ indices_begin,
indices_end);
+ _need_update_has_null = true;
+}
+
void ColumnNullable::insert(const Field& x) {
if (x.is_null()) {
get_nested_column().insert_default();
diff --git a/be/src/vec/columns/column_nullable.h
b/be/src/vec/columns/column_nullable.h
index 3b97e708d11..5fba17c3635 100644
--- a/be/src/vec/columns/column_nullable.h
+++ b/be/src/vec/columns/column_nullable.h
@@ -123,6 +123,9 @@ public:
void insert_range_from(const IColumn& src, size_t start, size_t length)
override;
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
+
void insert(const Field& x) override;
void insert_from(const IColumn& src, size_t n) override;
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index f3571c8ba29..d4ccab7f2e4 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -1002,4 +1002,15 @@ void ColumnObject::insert_indices_from(const IColumn&
src, const int* indices_be
});
}
+void ColumnObject::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
+ const uint32_t* indices_end) {
+ // insert_indices_from with alignment
+ const ColumnObject& src_column = *check_and_get_column<ColumnObject>(src);
+ align_variant_by_name_and_type(*this, src_column, indices_end -
indices_begin,
+ [indices_begin, indices_end](const IColumn&
src, IColumn* dst) {
+ dst->insert_indices_from_join(src,
indices_begin,
+
indices_end);
+ });
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 6bff69b1e67..179a70c4ccb 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -287,6 +287,9 @@ public:
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
+
// May throw execption
void try_insert(const Field& field);
diff --git a/be/src/vec/columns/column_string.cpp
b/be/src/vec/columns/column_string.cpp
index 5d5abd64349..d205c7cd12e 100644
--- a/be/src/vec/columns/column_string.cpp
+++ b/be/src/vec/columns/column_string.cpp
@@ -161,6 +161,43 @@ void ColumnString::insert_indices_from(const IColumn& src,
const int* indices_be
}
}
+void ColumnString::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
+ const uint32_t* indices_end) {
+ const ColumnString& src_str = assert_cast<const ColumnString&>(src);
+ auto src_offset_data = src_str.offsets.data();
+
+ auto old_char_size = chars.size();
+ size_t total_chars_size = old_char_size;
+
+ auto dst_offsets_pos = offsets.size();
+ offsets.resize(offsets.size() + indices_end - indices_begin);
+ auto* dst_offsets_data = offsets.data();
+
+ for (auto x = indices_begin; x != indices_end; ++x) {
+ if (*x != 0) {
+ total_chars_size += src_offset_data[*x] - src_offset_data[*x - 1];
+ }
+ dst_offsets_data[dst_offsets_pos++] = total_chars_size;
+ }
+ check_chars_length(total_chars_size, offsets.size());
+
+ chars.resize(total_chars_size);
+
+ auto* src_data_ptr = src_str.chars.data();
+ auto* dst_data_ptr = chars.data();
+
+ size_t dst_chars_pos = old_char_size;
+ for (auto x = indices_begin; x != indices_end; ++x) {
+ if (*x != 0) {
+ const size_t size_to_append = src_offset_data[*x] -
src_offset_data[*x - 1];
+ const size_t offset = src_offset_data[*x - 1];
+ memcpy_small_allow_read_write_overflow15(dst_data_ptr +
dst_chars_pos,
+ src_data_ptr + offset,
size_to_append);
+ dst_chars_pos += size_to_append;
+ }
+ }
+}
+
void ColumnString::update_crcs_with_value(uint32_t* __restrict hashes,
doris::PrimitiveType type,
uint32_t rows, uint32_t offset,
const uint8_t* __restrict null_data)
const {
diff --git a/be/src/vec/columns/column_string.h
b/be/src/vec/columns/column_string.h
index ae2bb9d25f9..b829eeaa377 100644
--- a/be/src/vec/columns/column_string.h
+++ b/be/src/vec/columns/column_string.h
@@ -475,6 +475,9 @@ public:
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
+
ColumnPtr filter(const Filter& filt, ssize_t result_size_hint) const
override;
size_t filter(const Filter& filter) override;
diff --git a/be/src/vec/columns/column_struct.cpp
b/be/src/vec/columns/column_struct.cpp
index 93c62139498..3502fdf581a 100644
--- a/be/src/vec/columns/column_struct.cpp
+++ b/be/src/vec/columns/column_struct.cpp
@@ -233,6 +233,15 @@ void ColumnStruct::insert_indices_from(const IColumn& src,
const int* indices_be
}
}
+void ColumnStruct::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
+ const uint32_t* indices_end) {
+ const ColumnStruct& src_concrete = assert_cast<const ColumnStruct&>(src);
+ for (size_t i = 0; i < columns.size(); ++i) {
+ columns[i]->insert_indices_from_join(src_concrete.get_column(i),
indices_begin,
+ indices_end);
+ }
+}
+
void ColumnStruct::insert_range_from(const IColumn& src, size_t start, size_t
length) {
const size_t tuple_size = columns.size();
for (size_t i = 0; i < tuple_size; ++i) {
diff --git a/be/src/vec/columns/column_struct.h
b/be/src/vec/columns/column_struct.h
index 36854824198..b1dd0ba795f 100644
--- a/be/src/vec/columns/column_struct.h
+++ b/be/src/vec/columns/column_struct.h
@@ -125,6 +125,9 @@ public:
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
+
void get_permutation(bool reverse, size_t limit, int nan_direction_hint,
Permutation& res) const override {
LOG(FATAL) << "get_permutation not implemented";
diff --git a/be/src/vec/columns/column_vector.cpp
b/be/src/vec/columns/column_vector.cpp
index 744e74b4843..4575d089781 100644
--- a/be/src/vec/columns/column_vector.cpp
+++ b/be/src/vec/columns/column_vector.cpp
@@ -388,6 +388,29 @@ void ColumnVector<T>::insert_indices_from(const IColumn&
src, const int* indices
}
}
+template <typename T>
+void ColumnVector<T>::insert_indices_from_join(const IColumn& src, const
uint32_t* indices_begin,
+ const uint32_t* indices_end) {
+ auto origin_size = size();
+ auto new_size = indices_end - indices_begin;
+ data.resize(origin_size + new_size);
+
+ const T* __restrict src_data = reinterpret_cast<const
T*>(src.get_raw_data().data);
+
+ if constexpr (std::is_same_v<T, UInt8>) {
+ // nullmap : indices_begin[i] == 0 means is null at the here, set true
here
+ for (uint32_t i = 0; i < new_size; ++i) {
+ data[origin_size + i] =
+ (indices_begin[i] == 0) + (indices_begin[i] != 0) *
src_data[indices_begin[i]];
+ }
+ } else {
+ // real data : indices_begin[i] == 0 what at is meaningless
+ for (uint32_t i = 0; i < new_size; ++i) {
+ data[origin_size + i] = src_data[indices_begin[i]];
+ }
+ }
+}
+
template <typename T>
ColumnPtr ColumnVector<T>::filter(const IColumn::Filter& filt, ssize_t
result_size_hint) const {
size_t size = data.size();
diff --git a/be/src/vec/columns/column_vector.h
b/be/src/vec/columns/column_vector.h
index 85d64740d19..f795a1057ab 100644
--- a/be/src/vec/columns/column_vector.h
+++ b/be/src/vec/columns/column_vector.h
@@ -382,6 +382,8 @@ public:
void insert_indices_from(const IColumn& src, const int* indices_begin,
const int* indices_end) override;
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override;
void fill(const value_type& element, size_t num) {
auto old_size = data.size();
auto new_size = old_size + num;
diff --git a/be/src/vec/columns/predicate_column.h
b/be/src/vec/columns/predicate_column.h
index 3aec3b1540f..ce5101c8030 100644
--- a/be/src/vec/columns/predicate_column.h
+++ b/be/src/vec/columns/predicate_column.h
@@ -166,6 +166,11 @@ public:
LOG(FATAL) << "insert_indices_from not supported in
PredicateColumnType";
}
+ void insert_indices_from_join(const IColumn& src, const uint32_t*
indices_begin,
+ const uint32_t* indices_end) override {
+ LOG(FATAL) << "insert_indices_from_join not supported in
PredicateColumnType";
+ }
+
void pop_back(size_t n) override {
LOG(FATAL) << "pop_back not supported in PredicateColumnType";
}
diff --git a/be/src/vec/common/hash_table/hash_map.h
b/be/src/vec/common/hash_table/hash_map.h
index 5b7cd6f4642..c26234b4e22 100644
--- a/be/src/vec/common/hash_table/hash_map.h
+++ b/be/src/vec/common/hash_table/hash_map.h
@@ -20,6 +20,8 @@
#pragma once
+#include <span>
+
#include "vec/common/hash_table/hash.h"
#include "vec/common/hash_table/hash_table.h"
#include "vec/common/hash_table/hash_table_allocator.h"
@@ -193,10 +195,222 @@ public:
bool has_null_key_data() const { return false; }
};
+template <typename Key, typename Cell, typename Hash = DefaultHash<Key>,
+ typename Grower = HashTableGrower<>, typename Allocator =
HashTableAllocator>
+class JoinHashMapTable : public HashMapTable<Key, Cell, Hash, Grower,
Allocator> {
+public:
+ using Self = JoinHashMapTable;
+ using Base = HashMapTable<Key, Cell, Hash, Grower, Allocator>;
+
+ using key_type = Key;
+ using value_type = typename Cell::value_type;
+ using mapped_type = typename Cell::Mapped;
+
+ using LookupResult = typename Base::LookupResult;
+
+ using HashMapTable<Key, Cell, Hash, Grower, Allocator>::HashMapTable;
+
+ static uint32_t calc_bucket_size(size_t num_elem) {
+ size_t expect_bucket_size = static_cast<size_t>(num_elem) + (num_elem
- 1) / 7;
+ return phmap::priv::NormalizeCapacity(expect_bucket_size) + 1;
+ }
+
+ template <int JoinOpType>
+ void prepare_build(size_t num_elem, int batch_size) {
+ max_batch_size = batch_size;
+ bucket_size = calc_bucket_size(num_elem + 1);
+ first.resize(bucket_size, 0);
+ next.resize(num_elem);
+
+ if constexpr (JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+ JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN ||
+ JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+ JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+ visited.resize(num_elem, 0);
+ }
+ }
+
+ uint32_t get_bucket_size() const { return bucket_size; }
+
+ void build(const Key* __restrict keys, const uint32_t* __restrict
bucket_nums,
+ size_t num_elem) {
+ build_keys = keys;
+ for (size_t i = 1; i < num_elem; i++) {
+ uint32_t bucket_num = bucket_nums[i];
+ next[i] = first[bucket_num];
+ first[bucket_num] = i;
+ }
+ }
+
+ template <int JoinOpType>
+ auto find_batch(const Key* __restrict keys, const uint32_t* __restrict
bucket_nums,
+ int probe_idx, int probe_rows, uint32_t* __restrict
probe_idxs,
+ uint32_t* __restrict build_idxs) {
+ if constexpr (JoinOpType == doris::TJoinOp::INNER_JOIN ||
+ JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN ||
+ JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+ JoinOpType == doris::TJoinOp::RIGHT_OUTER_JOIN) {
+ return _find_batch_inner_outer_join<JoinOpType>(keys, bucket_nums,
probe_idx,
+ probe_rows,
probe_idxs, build_idxs);
+ }
+ if constexpr (JoinOpType == doris::TJoinOp::LEFT_ANTI_JOIN ||
+ JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN) {
+ return _find_batch_left_semi_anti<JoinOpType>(keys, bucket_nums,
probe_idx, probe_rows,
+ probe_idxs);
+ }
+ if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN ||
+ JoinOpType == doris::TJoinOp::RIGHT_SEMI_JOIN) {
+ return _find_batch_right_semi_anti(keys, bucket_nums, probe_idx,
probe_rows);
+ }
+ return std::pair {0, 0};
+ }
+
+ template <int JoinOpType>
+ bool iterate_map(std::vector<uint32_t>& build_idxs) const {
+ const auto batch_size = max_batch_size;
+ const auto elem_num = visited.size();
+ int count = 0;
+ build_idxs.resize(batch_size);
+
+ while (count < batch_size && iter_idx < elem_num) {
+ const auto matched = visited[iter_idx];
+ build_idxs[count] = iter_idx;
+ if constexpr (JoinOpType == doris::TJoinOp::RIGHT_ANTI_JOIN) {
+ count += !matched;
+ } else {
+ count += matched;
+ }
+ iter_idx++;
+ }
+
+ build_idxs.resize(count);
+ return iter_idx >= elem_num;
+ }
+
+private:
+ auto _find_batch_right_semi_anti(const Key* __restrict keys,
+ const uint32_t* __restrict bucket_nums,
int probe_idx,
+ int probe_rows) {
+ while (LIKELY(probe_idx < probe_rows)) {
+ auto build_idx = first[bucket_nums[probe_idx]];
+
+ while (build_idx) {
+ if (keys[probe_idx] == build_keys[build_idx]) {
+ visited[build_idx] = 1;
+ }
+ build_idx = next[build_idx];
+ }
+ probe_idx++;
+ }
+ return std::pair {probe_idx, 0};
+ }
+
+ template <int JoinOpType>
+ auto _find_batch_left_semi_anti(const Key* __restrict keys,
+ const uint32_t* __restrict bucket_nums,
int probe_idx,
+ int probe_rows, uint32_t* __restrict
probe_idxs) {
+ auto matched_cnt = 0;
+ const auto batch_size = max_batch_size;
+
+ while (probe_idx < probe_rows && matched_cnt < batch_size) {
+ uint32_t bucket_num = bucket_nums[probe_idx];
+ auto build_idx = first[bucket_num];
+
+ while (build_idx) {
+ if (keys[probe_idx] == build_keys[build_idx]) {
+ break;
+ }
+ build_idx = next[build_idx];
+ }
+ bool matched =
+ JoinOpType == doris::TJoinOp::LEFT_SEMI_JOIN ? build_idx
!= 0 : build_idx == 0;
+ matched_cnt += matched;
+ probe_idxs[matched_cnt - matched] = probe_idx++;
+ }
+ return std::pair {probe_idx, matched_cnt};
+ }
+
+ template <int JoinOpType>
+ auto _find_batch_inner_outer_join(const Key* __restrict keys,
+ const uint32_t* __restrict bucket_nums,
int probe_idx,
+ int probe_rows, uint32_t* __restrict
probe_idxs,
+ uint32_t* __restrict build_idxs) {
+ auto matched_cnt = 0;
+ const auto batch_size = max_batch_size;
+ size_t build_idx = 0;
+
+ auto do_the_probe = [&]() {
+ while (build_idx && matched_cnt < batch_size) {
+ if (keys[probe_idx] == build_keys[build_idx]) {
+ probe_idxs[matched_cnt] = probe_idx;
+ build_idxs[matched_cnt] = build_idx;
+ if constexpr (JoinOpType ==
doris::TJoinOp::RIGHT_OUTER_JOIN ||
+ JoinOpType ==
doris::TJoinOp::FULL_OUTER_JOIN) {
+ visited[build_idx] = 1;
+ }
+ matched_cnt++;
+ }
+ build_idx = next[build_idx];
+ }
+
+ if constexpr (JoinOpType == doris::TJoinOp::LEFT_OUTER_JOIN ||
+ JoinOpType == doris::TJoinOp::FULL_OUTER_JOIN) {
+ // `(!matched_cnt || probe_idxs[matched_cnt - 1] !=
probe_idx)` means not match one build side
+ if (!matched_cnt || probe_idxs[matched_cnt - 1] != probe_idx) {
+ probe_idxs[matched_cnt] = probe_idx;
+ build_idxs[matched_cnt] = 0;
+ matched_cnt++;
+ }
+ }
+ probe_idx++;
+ };
+
+ if (probe_idx == current_probe_idx) {
+ current_probe_idx = -1;
+ build_idx = current_build_idx;
+ current_build_idx = 0;
+ do_the_probe();
+ }
+
+ while (probe_idx < probe_rows && matched_cnt < batch_size) {
+ uint32_t bucket_num = bucket_nums[probe_idx];
+ build_idx = first[bucket_num];
+ do_the_probe();
+ }
+
+ if (matched_cnt == batch_size && build_idx) {
+ probe_idx--;
+ current_probe_idx = probe_idx;
+ current_build_idx = build_idx;
+ }
+ return std::pair {probe_idx, matched_cnt};
+ }
+
+ const Key* __restrict build_keys;
+ std::vector<uint8_t> visited;
+
+ uint32_t bucket_size = 0;
+ int max_batch_size = 0;
+
+ int current_probe_idx = -1;
+ uint32_t current_build_idx = 0;
+
+ std::vector<uint32_t> first;
+ std::vector<uint32_t> next;
+
+ // use in iter hash map
+ mutable uint32_t iter_idx = 1;
+ Cell cell;
+ doris::vectorized::Arena* pool;
+};
+
template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
typename Grower = HashTableGrower<>, typename Allocator =
HashTableAllocator>
using HashMap = HashMapTable<Key, HashMapCell<Key, Mapped, Hash>, Hash,
Grower, Allocator>;
+template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>>
+using JoinFixedHashMap = JoinHashMapTable<Key, HashMapCell<Key, Mapped, Hash>,
Hash>;
+
template <typename Key, typename Mapped, typename Hash = DefaultHash<Key>,
typename Grower = HashTableGrower<>, typename Allocator =
HashTableAllocator>
using HashMapWithSavedHash =
diff --git a/be/src/vec/common/hash_table/hash_map_context.h
b/be/src/vec/common/hash_table/hash_map_context.h
index 0464380b18d..4c85cc74b3e 100644
--- a/be/src/vec/common/hash_table/hash_map_context.h
+++ b/be/src/vec/common/hash_table/hash_map_context.h
@@ -54,6 +54,9 @@ struct MethodBase {
Arena arena;
std::vector<size_t> hash_values;
+ // use in join case
+ std::vector<uint32_t> bucket_nums;
+
MethodBase() { hash_table.reset(new HashMap()); }
virtual ~MethodBase() = default;
@@ -68,8 +71,32 @@ struct MethodBase {
iterator = hash_table->begin();
}
}
+
virtual void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
- const uint8_t* null_map = nullptr) = 0;
+ const uint8_t* null_map = nullptr, bool
is_join = false,
+ bool is_build = false, uint32_t
bucket_size = 0) = 0;
+
+ void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size, const
uint8_t* null_map) {
+ bucket_nums.resize(num_rows);
+
+ if (null_map == nullptr) {
+ init_join_bucket_num(num_rows, bucket_size);
+ return;
+ }
+ for (uint32_t k = 0; k < num_rows; ++k) {
+ if (null_map[k]) {
+ continue;
+ }
+
+ bucket_nums[k] = hash_table->hash(keys[k]) & (bucket_size - 1);
+ }
+ }
+
+ void init_join_bucket_num(uint32_t num_rows, uint32_t bucket_size) {
+ for (uint32_t k = 0; k < num_rows; ++k) {
+ bucket_nums[k] = hash_table->hash(keys[k]) & (bucket_size - 1);
+ }
+ }
void init_hash_values(size_t num_rows, const uint8_t* null_map) {
if (null_map == nullptr) {
@@ -85,6 +112,7 @@ struct MethodBase {
hash_values[k] = hash_table->hash(keys[k]);
}
}
+
void init_hash_values(size_t num_rows) {
hash_values.resize(num_rows);
for (size_t k = 0; k < num_rows; ++k) {
@@ -145,7 +173,10 @@ struct MethodSerialized : public MethodBase<TData> {
using Base::init_iterator;
using State = ColumnsHashing::HashMethodSerialized<typename Base::Value,
typename Base::Mapped>;
using Base::try_presis_key;
-
+ // need keep until the hash probe end.
+ std::vector<StringRef> build_stored_keys;
+ Arena build_arena;
+ // refresh each time probe
std::vector<StringRef> stored_keys;
StringRef serialize_keys_to_pool_contiguous(size_t i, size_t keys_size,
@@ -160,40 +191,48 @@ struct MethodSerialized : public MethodBase<TData> {
return {begin, sum_size};
}
- void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
- const uint8_t* null_map = nullptr) override {
- Base::arena.clear();
- stored_keys.resize(num_rows);
+ void init_serialized_keys_impl(const ColumnRawPtrs& key_columns, size_t
num_rows,
+ std::vector<StringRef>& keys, Arena& arena)
{
+ arena.clear();
+ keys.resize(num_rows);
size_t max_one_row_byte_size = 0;
for (const auto& column : key_columns) {
max_one_row_byte_size += column->get_max_row_byte_size();
}
size_t total_bytes = max_one_row_byte_size * num_rows;
-
if (total_bytes > config::pre_serialize_keys_limit_bytes) {
// reach mem limit, don't serialize in batch
size_t keys_size = key_columns.size();
for (size_t i = 0; i < num_rows; ++i) {
- stored_keys[i] =
- serialize_keys_to_pool_contiguous(i, keys_size,
key_columns, Base::arena);
+ keys[i] = serialize_keys_to_pool_contiguous(i, keys_size,
key_columns, arena);
}
} else {
- auto* serialized_key_buffer =
- reinterpret_cast<uint8_t*>(Base::arena.alloc(total_bytes));
+ auto* serialized_key_buffer =
reinterpret_cast<uint8_t*>(arena.alloc(total_bytes));
for (size_t i = 0; i < num_rows; ++i) {
- stored_keys[i].data =
+ keys[i].data =
reinterpret_cast<char*>(serialized_key_buffer + i *
max_one_row_byte_size);
- stored_keys[i].size = 0;
+ keys[i].size = 0;
}
for (const auto& column : key_columns) {
- column->serialize_vec(stored_keys, num_rows,
max_one_row_byte_size);
+ column->serialize_vec(keys, num_rows, max_one_row_byte_size);
}
}
- Base::keys = stored_keys.data();
- Base::init_hash_values(num_rows, null_map);
+ Base::keys = keys.data();
+ }
+
+ void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
+ const uint8_t* null_map = nullptr, bool is_join
= false,
+ bool is_build = false, uint32_t bucket_size = 0)
override {
+ init_serialized_keys_impl(key_columns, num_rows, is_build ?
build_stored_keys : stored_keys,
+ is_build ? build_arena : Base::arena);
+ if (is_join) {
+ Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+ } else {
+ Base::init_hash_values(num_rows, null_map);
+ }
}
void insert_keys_into_columns(std::vector<StringRef>& keys,
MutableColumns& key_columns,
@@ -219,7 +258,8 @@ struct MethodStringNoCache : public MethodBase<TData> {
std::vector<StringRef> stored_keys;
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
- const uint8_t* null_map = nullptr) override {
+ const uint8_t* null_map = nullptr, bool is_join
= false,
+ bool is_build = false, uint32_t bucket_size = 0)
override {
const IColumn& column = *key_columns[0];
const auto& column_string = assert_cast<const ColumnString&>(
column.is_nullable()
@@ -234,7 +274,11 @@ struct MethodStringNoCache : public MethodBase<TData> {
}
Base::keys = stored_keys.data();
- Base::init_hash_values(num_rows, null_map);
+ if (is_join) {
+ Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+ } else {
+ Base::init_hash_values(num_rows, null_map);
+ }
}
void insert_keys_into_columns(std::vector<StringRef>& keys,
MutableColumns& key_columns,
@@ -255,7 +299,8 @@ struct MethodOneNumber : public MethodBase<TData> {
FieldType>;
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
- const uint8_t* null_map = nullptr) override {
+ const uint8_t* null_map = nullptr, bool is_join
= false,
+ bool is_build = false, uint32_t bucket_size = 0)
override {
Base::keys = (FieldType*)(key_columns[0]->is_nullable()
? assert_cast<const
ColumnNullable*>(key_columns[0])
->get_nested_column_ptr()
@@ -263,7 +308,11 @@ struct MethodOneNumber : public MethodBase<TData> {
->get_raw_data()
.data;
std::string name = key_columns[0]->get_name();
- Base::init_hash_values(num_rows, null_map);
+ if (is_join) {
+ Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+ } else {
+ Base::init_hash_values(num_rows, null_map);
+ }
}
void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
@@ -289,17 +338,20 @@ struct MethodKeysFixed : public MethodBase<TData> {
using State = ColumnsHashing::HashMethodKeysFixed<typename Base::Value,
Key, Mapped,
has_nullable_keys>;
+ // need keep until the hash probe end. use only in join
+ std::vector<Key> build_stored_keys;
+ // refresh each time probe hash table
std::vector<Key> stored_keys;
Sizes key_sizes;
MethodKeysFixed(Sizes key_sizes_) : key_sizes(std::move(key_sizes_)) {}
template <typename T>
- std::vector<T> pack_fixeds(size_t row_numbers, const ColumnRawPtrs&
key_columns,
- const ColumnRawPtrs& nullmap_columns) {
+ void pack_fixeds(size_t row_numbers, const ColumnRawPtrs& key_columns,
+ const ColumnRawPtrs& nullmap_columns, std::vector<T>&
result) {
size_t bitmap_size = get_bitmap_size(nullmap_columns.size());
+ result.resize(row_numbers);
- std::vector<T> result(row_numbers);
size_t offset = 0;
if (bitmap_size > 0) {
for (size_t j = 0; j < nullmap_columns.size(); j++) {
@@ -353,11 +405,11 @@ struct MethodKeysFixed : public MethodBase<TData> {
}
offset += key_sizes[j];
}
- return result;
}
void init_serialized_keys(const ColumnRawPtrs& key_columns, size_t
num_rows,
- const uint8_t* null_map = nullptr) override {
+ const uint8_t* null_map = nullptr, bool is_join
= false,
+ bool is_build = false, uint32_t bucket_size = 0)
override {
ColumnRawPtrs actual_columns;
ColumnRawPtrs null_maps;
if (has_nullable_keys) {
@@ -375,9 +427,20 @@ struct MethodKeysFixed : public MethodBase<TData> {
} else {
actual_columns = key_columns;
}
- stored_keys = pack_fixeds<Key>(num_rows, actual_columns, null_maps);
- Base::keys = stored_keys.data();
- Base::init_hash_values(num_rows, null_map);
+
+ if (is_build) {
+ pack_fixeds<Key>(num_rows, actual_columns, null_maps,
build_stored_keys);
+ Base::keys = build_stored_keys.data();
+ } else {
+ pack_fixeds<Key>(num_rows, actual_columns, null_maps, stored_keys);
+ Base::keys = stored_keys.data();
+ }
+
+ if (is_join) {
+ Base::init_join_bucket_num(num_rows, bucket_size, null_map);
+ } else {
+ Base::init_hash_values(num_rows, null_map);
+ }
}
void insert_keys_into_columns(std::vector<typename Base::Key>& keys,
@@ -485,14 +548,14 @@ struct MethodSingleNullableColumn : public
SingleColumnMethod {
#endif
template <typename RowRefListType>
-using SerializedHashTableContext =
MethodSerialized<PartitionedHashMap<StringRef, RowRefListType>>;
+using SerializedHashTableContext =
MethodSerialized<JoinFixedHashMap<StringRef, RowRefListType>>;
template <class T, typename RowRefListType>
using PrimaryTypeHashTableContext =
- MethodOneNumber<T, PartitionedHashMap<T, RowRefListType,
HashCRC32<T>>>;
+ MethodOneNumber<T, JoinFixedHashMap<T, RowRefListType, HashCRC32<T>>>;
template <class Key, bool has_null, typename Value>
using FixedKeyHashTableContext =
- MethodKeysFixed<PartitionedHashMap<Key, Value, HashCRC32<Key>>,
has_null>;
+ MethodKeysFixed<JoinFixedHashMap<Key, Value, HashCRC32<Key>>,
has_null>;
} // namespace doris::vectorized
\ No newline at end of file
diff --git a/be/src/vec/common/hash_table/hash_table.h
b/be/src/vec/common/hash_table/hash_table.h
index 39ee5ec9e0a..7287f2e0276 100644
--- a/be/src/vec/common/hash_table/hash_table.h
+++ b/be/src/vec/common/hash_table/hash_table.h
@@ -441,7 +441,6 @@ protected:
Cell* buf {nullptr}; /// A piece of memory for all elements except the
element with zero key.
Grower grower;
int64_t _resize_timer_ns;
-
// the bucket count threshold above which it's converted to partioned hash
table
// > 0: enable convert dynamically
// 0: convert is disabled
diff --git a/be/src/vec/common/hash_table/hash_table_set_build.h
b/be/src/vec/common/hash_table/hash_table_set_build.h
index e3c1ed27b1f..48c9e2d1673 100644
--- a/be/src/vec/common/hash_table/hash_table_set_build.h
+++ b/be/src/vec/common/hash_table/hash_table_set_build.h
@@ -24,11 +24,9 @@ namespace doris::vectorized {
template <class HashTableContext, bool is_intersect>
struct HashTableBuild {
template <typename Parent>
- HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs,
uint8_t offset,
- RuntimeState* state)
+ HashTableBuild(Parent* parent, int rows, ColumnRawPtrs& build_raw_ptrs,
RuntimeState* state)
: _mem_used(parent->mem_used()),
_rows(rows),
- _offset(offset),
_build_raw_ptrs(build_raw_ptrs),
_state(state) {}
@@ -48,9 +46,9 @@ struct HashTableBuild {
size_t k = 0;
auto creator = [&](const auto& ctor, auto& key, auto& origin) {
HashTableContext::try_presis_key(key, origin, arena);
- ctor(key, Mapped {k, _offset});
+ ctor(key, Mapped {k});
};
- auto creator_for_null_key = [&](auto& mapped) { mapped = {k, _offset};
};
+ auto creator_for_null_key = [&](auto& mapped) { mapped = {k}; };
for (; k < _rows; ++k) {
if (k % CHECK_FRECUENCY == 0) {
@@ -64,7 +62,6 @@ struct HashTableBuild {
private:
int64_t* _mem_used;
const int _rows;
- const uint8_t _offset;
ColumnRawPtrs& _build_raw_ptrs;
RuntimeState* _state;
};
diff --git a/be/src/vec/exec/join/join_op.h b/be/src/vec/exec/join/join_op.h
index 1b8b8f2c695..858f5197b03 100644
--- a/be/src/vec/exec/join/join_op.h
+++ b/be/src/vec/exec/join/join_op.h
@@ -18,7 +18,6 @@
#pragma once
#include "vec/common/arena.h"
#include "vec/common/columns_hashing.h"
-#include "vec/common/hash_table/hash_map.h"
#include "vec/core/block.h"
namespace doris::vectorized {
@@ -45,19 +44,19 @@ namespace doris::vectorized {
*/
struct RowRef {
uint32_t row_num = 0;
- uint8_t block_offset;
RowRef() = default;
- RowRef(size_t row_num_count, uint8_t block_offset_)
- : row_num(row_num_count), block_offset(block_offset_) {}
+ RowRef(size_t row_num_count) : row_num(row_num_count) {}
+ void clear() {};
};
struct RowRefWithFlag : public RowRef {
bool visited;
RowRefWithFlag() = default;
- RowRefWithFlag(size_t row_num_count, uint8_t block_offset_, bool
is_visited = false)
- : RowRef(row_num_count, block_offset_), visited(is_visited) {}
+ RowRefWithFlag(size_t row_num_count, bool is_visited = false)
+ : RowRef(row_num_count), visited(is_visited) {}
+ void clear() {};
};
/// Portion of RowRefs, 16 * (MAX_SIZE + 1) bytes sized.
@@ -93,14 +92,15 @@ public:
ForwardIterator() : root(nullptr), first(false), batch(nullptr),
position(0) {}
ForwardIterator(RowRefListType* begin)
- : root(begin), first(true), batch(root->next), position(0) {}
+ : root(begin), first(true), batch((&root->next)), position(0) {}
RowRefType& operator*() {
if (first) {
return *root;
}
- return batch->row_refs[position];
+ return batch->operator[](position);
}
+
RowRefType* operator->() { return &(**this); }
void operator++() {
@@ -109,21 +109,17 @@ public:
return;
}
- if (batch) {
+ if (batch && position < batch->size()) {
++position;
- if (position >= batch->size) {
- batch = batch->next;
- position = 0;
- }
}
}
- bool ok() const { return first || batch; }
+ bool ok() const { return first || (batch && position < batch->size()); }
private:
RowRefListType* root;
bool first;
- Batch<RowRefType>* batch;
+ std::vector<RowRefType>* batch;
size_t position;
};
@@ -131,76 +127,60 @@ struct RowRefList : RowRef {
using RowRefType = RowRef;
RowRefList() = default;
- RowRefList(size_t row_num_, uint8_t block_offset_) : RowRef(row_num_,
block_offset_) {}
+ RowRefList(size_t row_num_) : RowRef(row_num_) {}
ForwardIterator<RowRefList> begin() { return
ForwardIterator<RowRefList>(this); }
/// insert element after current one
- void insert(RowRefType&& row_ref, Arena& pool) {
- if (!next) {
- next = pool.alloc<Batch<RowRefType>>();
- *next = Batch<RowRefType>(nullptr);
- }
- next = next->insert(std::move(row_ref), pool);
- }
+ void insert(RowRefType&& row_ref, Arena& pool) {
next.emplace_back(std::move(row_ref)); }
+
+ void clear() { next.clear(); }
private:
friend class ForwardIterator<RowRefList>;
-
- Batch<RowRefType>* next = nullptr;
+ std::vector<RowRefType> next;
};
struct RowRefListWithFlag : RowRef {
using RowRefType = RowRef;
RowRefListWithFlag() = default;
- RowRefListWithFlag(size_t row_num_, uint8_t block_offset_) :
RowRef(row_num_, block_offset_) {}
+ RowRefListWithFlag(size_t row_num_) : RowRef(row_num_) {}
ForwardIterator<RowRefListWithFlag> const begin() {
return ForwardIterator<RowRefListWithFlag>(this);
}
/// insert element after current one
- void insert(RowRef&& row_ref, Arena& pool) {
- if (!next) {
- next = pool.alloc<Batch<RowRefType>>();
- *next = Batch<RowRefType>(nullptr);
- }
- next = next->insert(std::move(row_ref), pool);
- }
+ void insert(RowRefType&& row_ref, Arena& pool) {
next.emplace_back(row_ref); }
+
+ void clear() { next.clear(); }
bool visited = false;
private:
friend class ForwardIterator<RowRefListWithFlag>;
-
- Batch<RowRefType>* next = nullptr;
+ std::vector<RowRefType> next;
};
struct RowRefListWithFlags : RowRefWithFlag {
using RowRefType = RowRefWithFlag;
RowRefListWithFlags() = default;
- RowRefListWithFlags(size_t row_num_, uint8_t block_offset_)
- : RowRefWithFlag(row_num_, block_offset_) {}
+ RowRefListWithFlags(size_t row_num_) : RowRefWithFlag(row_num_) {}
ForwardIterator<RowRefListWithFlags> const begin() {
return ForwardIterator<RowRefListWithFlags>(this);
}
/// insert element after current one
- void insert(RowRefWithFlag&& row_ref, Arena& pool) {
- if (!next) {
- next = pool.alloc<Batch<RowRefType>>();
- *next = Batch<RowRefType>(nullptr);
- }
- next = next->insert(std::move(row_ref), pool);
- }
+ void insert(RowRefType&& row_ref, Arena& pool) {
next.emplace_back(row_ref); }
+
+ void clear() { next.clear(); }
private:
friend class ForwardIterator<RowRefListWithFlags>;
-
- Batch<RowRefType>* next = nullptr;
+ std::vector<RowRefType> next;
};
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/process_hash_table_probe.h
b/be/src/vec/exec/join/process_hash_table_probe.h
index 435cea84186..9c2fd6094b5 100644
--- a/be/src/vec/exec/join/process_hash_table_probe.h
+++ b/be/src/vec/exec/join/process_hash_table_probe.h
@@ -75,7 +75,7 @@ struct ProcessHashTableProbe {
UInt8* __restrict null_map_data,
UInt8* __restrict filter_map,
Block* output_block);
- void _emplace_element(int8_t block_offset, int32_t block_row, int&
current_offset);
+ void _emplace_element(int32_t block_row, int& current_offset);
template <typename HashTableType>
typename HashTableType::State _init_probe_side(HashTableType&
hash_table_ctx, size_t probe_rows,
@@ -94,14 +94,13 @@ struct ProcessHashTableProbe {
Parent* _parent;
const int _batch_size;
- std::shared_ptr<std::vector<Block>> _build_blocks;
+ std::shared_ptr<Block> _build_block;
std::unique_ptr<Arena> _arena;
std::vector<StringRef> _probe_keys;
std::vector<uint32_t> _probe_indexs;
- PaddedPODArray<int8_t> _build_block_offsets;
- PaddedPODArray<int32_t> _build_block_rows;
- std::vector<std::pair<int8_t, int>> _build_blocks_locs;
+ std::vector<uint32_t> _build_indexs;
+ std::vector<int> _build_blocks_locs;
// only need set the tuple is null in RIGHT_OUTER_JOIN and FULL_OUTER_JOIN
ColumnUInt8::Container* _tuple_is_null_left_flags;
// only need set the tuple is null in LEFT_OUTER_JOIN and FULL_OUTER_JOIN
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index f4a3010c49f..248e8f42328 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -32,7 +32,7 @@ template <int JoinOpType, typename Parent>
ProcessHashTableProbe<JoinOpType, Parent>::ProcessHashTableProbe(Parent*
parent, int batch_size)
: _parent(parent),
_batch_size(batch_size),
- _build_blocks(parent->build_blocks()),
+ _build_block(parent->build_block()),
_tuple_is_null_left_flags(parent->is_outer_join()
? &(reinterpret_cast<ColumnUInt8&>(
*parent->_tuple_is_null_left_flag_column)
@@ -69,51 +69,13 @@ void ProcessHashTableProbe<JoinOpType,
Parent>::build_side_output_column(
JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType ==
TJoinOp::FULL_OUTER_JOIN;
if (!is_semi_anti_join || have_other_join_conjunct) {
- if (_build_blocks->size() == 1) {
- for (int i = 0; i < _right_col_len; i++) {
- auto& column = *(*_build_blocks)[0].get_by_position(i).column;
- if (output_slot_flags[i]) {
- mcol[i + _right_col_idx]->insert_indices_from(column,
_build_block_rows.data(),
-
_build_block_rows.data() + size);
- } else {
- mcol[i + _right_col_idx]->insert_many_defaults(size);
- }
- }
- } else {
- for (int i = 0; i < _right_col_len; i++) {
- if (output_slot_flags[i]) {
- for (int j = 0; j < size; j++) {
- if constexpr (probe_all) {
- if (_build_block_offsets[j] == -1) {
- DCHECK(mcol[i +
_right_col_idx]->is_nullable());
- assert_cast<ColumnNullable*>(mcol[i +
_right_col_idx].get())
- ->insert_default();
- } else {
- auto& column =
*(*_build_blocks)[_build_block_offsets[j]]
- .get_by_position(i)
- .column;
- mcol[i + _right_col_idx]->insert_from(column,
_build_block_rows[j]);
- }
- } else {
- if (_build_block_offsets[j] == -1) {
- // the only case to reach here:
- // 1. left anti join with other conjuncts, and
- // 2. equal conjuncts does not match
- // since nullptr is emplaced back to
visited_map,
- // the output value of the build side does not
matter,
- // just insert default value
- mcol[i + _right_col_idx]->insert_default();
- } else {
- auto& column =
*(*_build_blocks)[_build_block_offsets[j]]
- .get_by_position(i)
- .column;
- mcol[i + _right_col_idx]->insert_from(column,
_build_block_rows[j]);
- }
- }
- }
- } else {
- mcol[i + _right_col_idx]->insert_many_defaults(size);
- }
+ for (int i = 0; i < _right_col_len; i++) {
+ const auto& column = *_build_block->get_by_position(i).column;
+ if (output_slot_flags[i]) {
+ mcol[i + _right_col_idx]->insert_indices_from_join(column,
_build_indexs.data(),
+
_build_indexs.data() + size);
+ } else {
+ mcol[i + _right_col_idx]->insert_many_defaults(size);
}
}
}
@@ -123,7 +85,7 @@ void ProcessHashTableProbe<JoinOpType,
Parent>::build_side_output_column(
_tuple_is_null_right_flags->resize(size);
auto* __restrict null_data = _tuple_is_null_right_flags->data();
for (int i = 0; i < size; ++i) {
- null_data[i] = _build_block_rows[i] == -1;
+ null_data[i] = _build_indexs[i] == 0;
}
}
}
@@ -166,8 +128,7 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
_right_col_len = _parent->right_table_data_types().size();
_row_count_from_last_probe = 0;
- _build_block_rows.clear();
- _build_block_offsets.clear();
+ _build_indexs.clear();
_probe_indexs.clear();
if (with_other_join_conjuncts) {
// use in right join to change visited state after exec the vother
join conjunct
@@ -176,14 +137,14 @@ typename HashTableType::State
ProcessHashTableProbe<JoinOpType, Parent>::_init_p
_visited_map.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
_same_to_prev.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
}
- _probe_indexs.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
- _build_block_rows.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
- _build_block_offsets.reserve(_batch_size * PROBE_SIDE_EXPLODE_RATE);
+ _probe_indexs.resize(_batch_size);
+ _build_indexs.resize(_batch_size);
if (!_parent->_ready_probe) {
_parent->_ready_probe = true;
hash_table_ctx.reset();
- hash_table_ctx.init_serialized_keys(_parent->_probe_columns,
probe_rows, null_map);
+ hash_table_ctx.init_serialized_keys(_parent->_probe_columns,
probe_rows, null_map, true,
+ false,
hash_table_ctx.hash_table->get_bucket_size());
}
return typename HashTableType::State(_parent->_probe_columns);
}
@@ -199,8 +160,7 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType,
Parent>::_probe_row_m
SCOPED_TIMER(_search_hashtable_timer);
for (; probe_row_match_iter.ok() && current_offset < _batch_size;
++probe_row_match_iter) {
- _emplace_element(probe_row_match_iter->block_offset,
probe_row_match_iter->row_num,
- current_offset);
+ _emplace_element(probe_row_match_iter->row_num, current_offset);
_probe_indexs.emplace_back(probe_index);
if constexpr (with_other_join_conjuncts) {
_visited_map.emplace_back(&probe_row_match_iter->visited);
@@ -218,11 +178,9 @@ ForwardIterator<Mapped>& ProcessHashTableProbe<JoinOpType,
Parent>::_probe_row_m
}
template <int JoinOpType, typename Parent>
-void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int8_t
block_offset,
- int32_t
block_row,
+void ProcessHashTableProbe<JoinOpType, Parent>::_emplace_element(int32_t
block_row,
int&
current_offset) {
- _build_block_offsets.emplace_back(block_offset);
- _build_block_rows.emplace_back(block_row);
+ _build_indexs.emplace_back(block_row);
current_offset++;
}
@@ -236,25 +194,17 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
size_t
probe_rows) {
auto& probe_index = _parent->_probe_index;
- using KeyGetter = typename HashTableType::State;
using Mapped = typename HashTableType::Mapped;
- KeyGetter key_getter =
- _init_probe_side<HashTableType>(hash_table_ctx, probe_rows,
with_other_conjuncts,
- need_null_map_for_probe ?
null_map->data() : nullptr);
+ _init_probe_side<HashTableType>(hash_table_ctx, probe_rows,
with_other_conjuncts,
+ need_null_map_for_probe ? null_map->data()
: nullptr);
auto& mcol = mutable_block.mutable_columns();
- constexpr auto is_right_semi_anti_join =
- JoinOpType == TJoinOp::RIGHT_ANTI_JOIN || JoinOpType ==
TJoinOp::RIGHT_SEMI_JOIN;
-
- constexpr auto probe_all =
- JoinOpType == TJoinOp::LEFT_OUTER_JOIN || JoinOpType ==
TJoinOp::FULL_OUTER_JOIN;
-
int last_probe_index = probe_index;
int current_offset = 0;
- bool all_match_one = true;
+ bool all_match_one = false;
size_t probe_size = 0;
auto& probe_row_match_iter = _probe_row_match<Mapped,
with_other_conjuncts>(
@@ -283,121 +233,13 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::do_process(HashTableType& hash
{
SCOPED_TIMER(_search_hashtable_timer);
- using FindResult = KeyGetter::FindResult;
- FindResult empty = {nullptr, false};
- while (current_offset < _batch_size && probe_index < probe_rows) {
- if constexpr (ignore_null && need_null_map_for_probe) {
- if ((*null_map)[probe_index]) {
- if constexpr (probe_all) {
- // only full outer / left outer need insert the data
of right table
- _emplace_element(-1, -1, current_offset);
- _probe_indexs.emplace_back(probe_index);
-
- if constexpr (with_other_conjuncts) {
- _same_to_prev.emplace_back(false);
- _visited_map.emplace_back(nullptr);
- }
- } else {
- all_match_one = false;
- }
- probe_index++;
- continue;
- }
- }
-
- const auto& find_result = need_null_map_for_probe &&
(*null_map)[probe_index]
- ? empty
- :
hash_table_ctx.find(key_getter, probe_index);
-
- auto current_probe_index = probe_index;
- if constexpr (!with_other_conjuncts &&
- (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- JoinOpType == TJoinOp::LEFT_SEMI_JOIN)) {
- bool need_go_ahead =
- (JoinOpType != TJoinOp::LEFT_SEMI_JOIN) ^
find_result.is_found();
- if constexpr (is_mark_join) {
- ++current_offset;
- bool null_result = (need_null_map_for_probe &&
(*null_map)[probe_index]) ||
- (!need_go_ahead &&
*_has_null_in_build_side);
- if (null_result) {
- mark_column->insert_null();
- } else {
- mark_column->insert_value(need_go_ahead);
- }
- } else {
- current_offset += need_go_ahead;
- }
- ++probe_index;
- } else {
- if (find_result.is_found()) {
- auto& mapped = find_result.get_mapped();
- auto origin_offset = current_offset;
-
- // For mark join, if euqual-matched tuple count for one
probe row
- // excceeds batch size, it's difficult to implement the
logic to
- // split them into multiple sub blocks and handle them,
keep the original
- // logic for now.
- if constexpr (is_mark_join && with_other_conjuncts) {
- for (auto it = mapped.begin(); it.ok(); ++it) {
- _emplace_element(it->block_offset, it->row_num,
current_offset);
- _visited_map.emplace_back(&it->visited);
- }
- ++probe_index;
- } else if constexpr (with_other_conjuncts ||
!is_right_semi_anti_join) {
- auto multi_match_last_offset = current_offset;
- auto it = mapped.begin();
- for (; it.ok() && current_offset < _batch_size; ++it) {
- _emplace_element(it->block_offset, it->row_num,
current_offset);
-
- if constexpr (with_other_conjuncts) {
- _visited_map.emplace_back(&it->visited);
- }
- }
- probe_row_match_iter = it;
- if (!it.ok()) {
- // If all matched rows for the current probe row
are handled,
- // advance to next probe row.
- // If not(which means it excceed batch size),
probe_index is not increased and
- // remaining matched rows for the current probe
row will be
- // handled in the next call of this function
- ++probe_index;
- } else if constexpr (with_other_conjuncts) {
- multi_matched_output_row_count =
- current_offset - multi_match_last_offset;
- }
- } else {
- ++probe_index;
- }
- if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
- mapped.visited = true;
- }
-
- if constexpr (with_other_conjuncts) {
- _same_to_prev.emplace_back(false);
- for (int i = 0; i < current_offset - origin_offset -
1; ++i) {
- _same_to_prev.emplace_back(true);
- }
- }
- } else if constexpr (probe_all || JoinOpType ==
TJoinOp::LEFT_ANTI_JOIN ||
- JoinOpType ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ||
- (JoinOpType == TJoinOp::LEFT_SEMI_JOIN &&
is_mark_join)) {
- // only full outer / left outer need insert the data of
right table
- _emplace_element(-1, -1, current_offset);
-
- if constexpr (with_other_conjuncts) {
- _same_to_prev.emplace_back(false);
- _visited_map.emplace_back(nullptr);
- }
- ++probe_index;
- } else {
- ++probe_index;
- }
- }
- all_match_one &= (current_offset == _probe_indexs.size() + 1);
- _probe_indexs.resize(current_offset, current_probe_index);
- }
- probe_size = probe_index - last_probe_index +
probe_row_match_iter.ok();
+ auto [new_probe_idx, new_current_offset] =
+ hash_table_ctx.hash_table->template find_batch<JoinOpType>(
+ hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(), probe_index,
+ probe_rows, _probe_indexs.data(),
_build_indexs.data());
+ probe_index = new_probe_idx;
+ current_offset = new_current_offset;
+ probe_size = probe_index - last_probe_index;
}
build_side_output_column(mcol, *_right_output_slot_flags, current_offset,
with_other_conjuncts);
@@ -776,129 +618,21 @@ template <typename HashTableType>
Status ProcessHashTableProbe<JoinOpType, Parent>::process_data_in_hashtable(
HashTableType& hash_table_ctx, MutableBlock& mutable_block, Block*
output_block,
bool* eos) {
- using Mapped = typename HashTableType::Mapped;
SCOPED_TIMER(_probe_process_hashtable_timer);
- if constexpr (std::is_same_v<Mapped, RowRefListWithFlag> ||
- std::is_same_v<Mapped, RowRefListWithFlags>) {
- hash_table_ctx.init_iterator();
- auto& mcol = mutable_block.mutable_columns();
-
- bool right_semi_anti_without_other = _is_right_semi_anti &&
!_have_other_join_conjunct;
- int right_col_idx =
- right_semi_anti_without_other ? 0 :
_parent->left_table_data_types().size();
- int right_col_len = _parent->right_table_data_types().size();
-
- auto& iter = hash_table_ctx.iterator;
- auto block_size = 0;
- auto& visited_iter =
-
std::get<ForwardIterator<Mapped>>(_parent->_outer_join_pull_visited_iter);
- _build_blocks_locs.resize(_batch_size);
- auto register_build_loc = [&](int8_t offset, int32_t row_nums) {
- _build_blocks_locs[block_size++] = std::pair<int8_t, int>(offset,
row_nums);
- };
-
- if (visited_iter.ok()) {
- if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
- for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- } else {
- for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
- if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
- if (visited_iter->visited) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- } else {
- if (!visited_iter->visited) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- }
- }
- }
- if (!visited_iter.ok()) {
- ++iter;
- }
- }
-
- for (; iter != hash_table_ctx.hash_table->end() && block_size <
_batch_size; ++iter) {
- auto& mapped = iter->get_second();
- if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
- if (mapped.visited) {
- if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
- visited_iter = mapped.begin();
- for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- if (visited_iter.ok()) {
- // block_size >= _batch_size, quit for loop
- break;
- }
- }
- } else {
- if constexpr (JoinOpType != TJoinOp::RIGHT_SEMI_JOIN) {
- visited_iter = mapped.begin();
- for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- if (visited_iter.ok()) {
- // block_size >= _batch_size, quit for loop
- break;
- }
- }
- }
- } else {
- visited_iter = mapped.begin();
- for (; visited_iter.ok() && block_size < _batch_size;
++visited_iter) {
- if constexpr (JoinOpType == TJoinOp::RIGHT_SEMI_JOIN) {
- if (visited_iter->visited) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- } else {
- if (!visited_iter->visited) {
- register_build_loc(visited_iter->block_offset,
visited_iter->row_num);
- }
- }
- }
- if (visited_iter.ok()) {
- // block_size >= _batch_size, quit for loop
- break;
- }
- }
- }
- _build_blocks_locs.resize(block_size);
-
- auto insert_build_rows = [&](int8_t offset) {
- for (size_t j = 0; j < right_col_len; ++j) {
- auto& column =
*(*_build_blocks)[offset].get_by_position(j).column;
- mcol[j + right_col_idx]->insert_indices_from(
- column, _build_block_rows.data(),
- _build_block_rows.data() + _build_block_rows.size());
- }
- };
- if (_build_blocks->size() > 1) {
- std::sort(_build_blocks_locs.begin(), _build_blocks_locs.end(),
- [](const auto a, const auto b) { return a.first >
b.first; });
- auto start = 0, end = 0;
- while (start < _build_blocks_locs.size()) {
- while (end < _build_blocks_locs.size() &&
- _build_blocks_locs[start].first ==
_build_blocks_locs[end].first) {
- end++;
- }
- auto offset = _build_blocks_locs[start].first;
- _build_block_rows.resize(end - start);
- for (int i = 0; start + i < end; i++) {
- _build_block_rows[i] = _build_blocks_locs[start +
i].second;
- }
- start = end;
- insert_build_rows(offset);
- }
- } else if (_build_blocks->size() == 1) {
- const auto size = _build_blocks_locs.size();
- _build_block_rows.resize(_build_blocks_locs.size());
- for (int i = 0; i < size; i++) {
- _build_block_rows[i] = _build_blocks_locs[i].second;
- }
- insert_build_rows(0);
+ auto& mcol = mutable_block.mutable_columns();
+ *eos = hash_table_ctx.hash_table->template
iterate_map<JoinOpType>(_build_indexs);
+ auto block_size = _build_indexs.size();
+ int right_col_idx =
+ JoinOpType == TJoinOp::RIGHT_OUTER_JOIN || JoinOpType ==
TJoinOp::FULL_OUTER_JOIN
+ ? _parent->left_table_data_types().size()
+ : 0;
+ int right_col_len = _parent->right_table_data_types().size();
+
+ if (block_size) {
+ for (size_t j = 0; j < right_col_len; ++j) {
+ const auto& column = *_build_block->get_by_position(j).column;
+ mcol[j + right_col_idx]->insert_indices_from_join(
+ column, _build_indexs.data(), _build_indexs.data() +
_build_indexs.size());
}
// just resize the left table column in case with other conjunct to
make block size is not zero
@@ -917,15 +651,10 @@ Status ProcessHashTableProbe<JoinOpType,
Parent>::process_data_in_hashtable(
}
_tuple_is_null_left_flags->resize_fill(block_size, 1);
}
- *eos = iter == hash_table_ctx.hash_table->end();
- output_block->swap(
- mutable_block.to_block(right_semi_anti_without_other ?
right_col_idx : 0));
+ output_block->swap(mutable_block.to_block(0));
DCHECK(block_size <= _batch_size);
- return Status::OK();
- } else {
- LOG(FATAL) << "Invalid RowRefList";
- return Status::InvalidArgument("Invalid RowRefList");
}
+ return Status::OK();
}
template <int JoinOpType, typename Parent>
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index 70eb7dd8a94..9e8c99c8d56 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -98,12 +98,6 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const
TPlanNode& tnode, const Descr
_arena = std::make_shared<Arena>();
_hash_table_variants = std::make_shared<HashTableVariants>();
_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
- _build_blocks.reset(new std::vector<Block>());
-
- // avoid vector expand change block address.
- // one block can store 4g data, _build_blocks can store 128*4g data.
- // if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
- _build_blocks->reserve(HASH_JOIN_MAX_BUILD_BLOCK_COUNT);
}
Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -730,9 +724,6 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
SCOPED_TIMER(_exec_timer);
SCOPED_TIMER(_build_timer);
- // make one block for each 4 gigabytes
- constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
-
if (_has_null_in_build_side) {
// TODO: if _has_null_in_build_side is true we should finish current
pipeline task.
DCHECK(state->enable_pipeline_exec());
@@ -745,41 +736,25 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
if (in_block->rows() != 0) {
SCOPED_TIMER(_build_side_merge_block_timer);
+ if (_build_side_mutable_block.empty()) {
+ RETURN_IF_ERROR(_build_side_mutable_block.merge(
+ *(in_block->create_same_struct_block(1, false))));
+ }
RETURN_IF_ERROR(_build_side_mutable_block.merge(*in_block));
- }
-
- if (UNLIKELY(_build_side_mem_used - _build_side_last_mem_used >
BUILD_BLOCK_MAX_SIZE)) {
- if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
- return Status::NotSupported(strings::Substitute(
- "data size of right table in hash join > $0",
- BUILD_BLOCK_MAX_SIZE *
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
+ if (_build_side_mutable_block.rows() >
std::numeric_limits<uint32_t>::max()) {
+ return Status::NotSupported(
+ "Hash join do not support build table rows"
+ " over:" +
+ std::to_string(std::numeric_limits<uint32_t>::max()));
}
- _build_blocks->emplace_back(_build_side_mutable_block.to_block());
-
- COUNTER_UPDATE(_build_blocks_memory_usage,
(*_build_blocks)[_build_block_idx].bytes());
-
- // TODO:: Rethink may we should do the process after we receive
all build blocks ?
- // which is better.
- RETURN_IF_ERROR(_process_build_block(state,
(*_build_blocks)[_build_block_idx],
- _build_block_idx));
-
- _build_side_mutable_block = MutableBlock();
- ++_build_block_idx;
- _build_side_last_mem_used = _build_side_mem_used;
}
}
if (_should_build_hash_table && eos) {
if (!_build_side_mutable_block.empty()) {
- if (_build_blocks->size() == HASH_JOIN_MAX_BUILD_BLOCK_COUNT) {
- return Status::NotSupported(strings::Substitute(
- "data size of right table in hash join > $0",
- BUILD_BLOCK_MAX_SIZE *
HASH_JOIN_MAX_BUILD_BLOCK_COUNT));
- }
- _build_blocks->emplace_back(_build_side_mutable_block.to_block());
- COUNTER_UPDATE(_build_blocks_memory_usage,
(*_build_blocks)[_build_block_idx].bytes());
- RETURN_IF_ERROR(_process_build_block(state,
(*_build_blocks)[_build_block_idx],
- _build_block_idx));
+ _build_block =
std::make_shared<Block>(_build_side_mutable_block.to_block());
+ COUNTER_UPDATE(_build_blocks_memory_usage, _build_block->bytes());
+ RETURN_IF_ERROR(_process_build_block(state, *_build_block));
}
auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
LOG(FATAL) << "FATAL: uninited
hash table";
@@ -801,7 +776,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
_shared_hash_table_context->status = Status::OK();
// arena will be shared with other instances.
_shared_hash_table_context->arena = _arena;
- _shared_hash_table_context->blocks = _build_blocks;
+ _shared_hash_table_context->block = _build_block;
_shared_hash_table_context->hash_table_variants =
_hash_table_variants;
_shared_hash_table_context->short_circuit_for_null_in_probe_side =
_has_null_in_build_side;
@@ -833,7 +808,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
*_hash_table_variants,
*std::static_pointer_cast<HashTableVariants>(
_shared_hash_table_context->hash_table_variants));
- _build_blocks = _shared_hash_table_context->blocks;
+ _build_block = _shared_hash_table_context->block;
if (!_shared_hash_table_context->runtime_filters.empty()) {
auto ret = std::visit(
@@ -866,7 +841,7 @@ Status HashJoinNode::sink(doris::RuntimeState* state,
vectorized::Block* in_bloc
// Since the comparison of null values is meaningless, null aware left
anti join should not output null
// when the build side is not empty.
- if (!_build_blocks->empty() && _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
+ if (_build_block && _join_op == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) {
_probe_ignore_null = true;
}
_init_short_circuit_for_probe();
@@ -965,7 +940,7 @@ void HashJoinNode::_set_build_ignore_flag(Block& block,
const std::vector<int>&
}
}
-Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block,
uint8_t offset) {
+Status HashJoinNode::_process_build_block(RuntimeState* state, Block& block) {
SCOPED_TIMER(_build_table_timer);
size_t rows = block.rows();
if (UNLIKELY(rows == 0)) {
@@ -996,28 +971,30 @@ Status HashJoinNode::_process_build_block(RuntimeState*
state, Block& block, uin
Status st = _extract_join_column<true>(block, null_map_val, raw_ptrs,
res_col_ids);
st = std::visit(
- Overload {
- [&](std::monostate& arg, auto has_null_value,
- auto short_circuit_for_null_in_build_side) -> Status {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- return Status::OK();
- },
- [&](auto&& arg, auto has_null_value,
- auto short_circuit_for_null_in_build_side) -> Status {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
- hash_table_build_process(rows, block,
raw_ptrs, this,
- state->batch_size(),
offset, state);
- return hash_table_build_process
- .template run<has_null_value,
short_circuit_for_null_in_build_side>(
- arg,
- has_null_value ||
short_circuit_for_null_in_build_side
- ? &null_map_val->get_data()
- : nullptr,
- &_has_null_in_build_side);
- }},
- *_hash_table_variants, make_bool_variant(_build_side_ignore_null),
+ Overload {[&](std::monostate& arg, auto join_op, auto
has_null_value,
+ auto short_circuit_for_null_in_build_side) -> Status
{
+ LOG(FATAL) << "FATAL: uninited hash table";
+ __builtin_unreachable();
+ return Status::OK();
+ },
+ [&](auto&& arg, auto&& join_op, auto has_null_value,
+ auto short_circuit_for_null_in_build_side) -> Status
{
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ using JoinOpType = std::decay_t<decltype(join_op)>;
+
+ ProcessHashTableBuild<HashTableCtxType, HashJoinNode>
+ hash_table_build_process(rows, block,
raw_ptrs, this,
+
state->batch_size(), state);
+ return hash_table_build_process
+ .template run<JoinOpType::value,
has_null_value,
+
short_circuit_for_null_in_build_side>(
+ arg,
+ has_null_value ||
short_circuit_for_null_in_build_side
+ ? &null_map_val->get_data()
+ : nullptr,
+ &_has_null_in_build_side);
+ }},
+ *_hash_table_variants, _join_op_variants,
make_bool_variant(_build_side_ignore_null),
make_bool_variant(_short_circuit_for_null_in_build_side));
return st;
@@ -1086,7 +1063,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
return;
}
- if (!try_get_hash_map_context_fixed<PartitionedHashMap,
HashCRC32, RowRefListType>(
+ if (!try_get_hash_map_context_fixed<JoinFixedHashMap,
HashCRC32, RowRefListType>(
*_hash_table_variants, _build_expr_ctxs)) {
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListType>>();
}
@@ -1094,16 +1071,6 @@ void HashJoinNode::_hash_table_init(RuntimeState* state)
{
_join_op_variants, make_bool_variant(_have_other_join_conjunct));
DCHECK(!std::holds_alternative<std::monostate>(*_hash_table_variants));
-
- std::visit(Overload {[&](std::monostate& arg) {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- },
- [&](auto&& arg) {
- arg.hash_table->set_partitioned_threshold(
-
state->partitioned_hash_join_rows_threshold());
- }},
- *_hash_table_variants);
}
void HashJoinNode::_process_hashtable_ctx_variants_init(RuntimeState* state) {
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 06c28de46c6..5633d606d01 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -86,10 +86,10 @@ struct ProcessRuntimeFilterBuild {
RETURN_IF_ERROR(parent->_runtime_filter_slots->init(
state, hash_table_ctx.hash_table->size(),
parent->_build_rf_cardinality));
- if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_rows.empty()) {
+ if (!parent->_runtime_filter_slots->empty() &&
!parent->_inserted_blocks.empty()) {
{
SCOPED_TIMER(parent->_push_compute_timer);
- parent->_runtime_filter_slots->insert(parent->_inserted_rows);
+
parent->_runtime_filter_slots->insert(parent->_inserted_blocks);
}
}
{
@@ -106,140 +106,52 @@ using ProfileCounter = RuntimeProfile::Counter;
template <class HashTableContext, typename Parent>
struct ProcessHashTableBuild {
ProcessHashTableBuild(int rows, Block& acquired_block, ColumnRawPtrs&
build_raw_ptrs,
- Parent* parent, int batch_size, uint8_t offset,
RuntimeState* state)
+ Parent* parent, int batch_size, RuntimeState* state)
: _rows(rows),
- _skip_rows(0),
_acquired_block(acquired_block),
_build_raw_ptrs(build_raw_ptrs),
_parent(parent),
_batch_size(batch_size),
- _offset(offset),
_state(state),
_build_side_compute_hash_timer(parent->_build_side_compute_hash_timer) {}
- template <bool ignore_null, bool short_circuit_for_null>
+ template <int JoinOpType, bool ignore_null, bool short_circuit_for_null>
Status run(HashTableContext& hash_table_ctx, ConstNullMapPtr null_map,
bool* has_null_key) {
- using KeyGetter = typename HashTableContext::State;
- using Mapped = typename HashTableContext::Mapped;
-
- Defer defer {[&]() {
- int64_t bucket_size =
hash_table_ctx.hash_table->get_buffer_size_in_cells();
- int64_t filled_bucket_size = hash_table_ctx.hash_table->size();
- int64_t bucket_bytes =
hash_table_ctx.hash_table->get_buffer_size_in_bytes();
- COUNTER_SET(_parent->_hash_table_memory_usage, bucket_bytes);
- COUNTER_SET(_parent->_build_buckets_counter, bucket_size);
- COUNTER_SET(_parent->_build_collisions_counter,
- hash_table_ctx.hash_table->get_collisions());
- COUNTER_SET(_parent->_build_buckets_fill_counter,
filled_bucket_size);
-
- auto hash_table_buckets =
hash_table_ctx.hash_table->get_buffer_sizes_in_cells();
- std::string hash_table_buckets_info;
- for (auto bucket_count : hash_table_buckets) {
- hash_table_buckets_info += std::to_string(bucket_count) + ", ";
+ if (short_circuit_for_null || ignore_null) {
+ for (uint32_t i = 0; i < _rows; i++) {
+ if ((*null_map)[i]) {
+ *has_null_key = true;
+ }
}
- _parent->add_hash_buckets_info(hash_table_buckets_info);
-
- auto hash_table_sizes = hash_table_ctx.hash_table->sizes();
- hash_table_buckets_info.clear();
- for (auto table_size : hash_table_sizes) {
- hash_table_buckets_info += std::to_string(table_size) + ", ";
+ if (short_circuit_for_null && *has_null_key) {
+ return Status::OK();
}
- _parent->add_hash_buckets_filled_info(hash_table_buckets_info);
- }};
-
- KeyGetter key_getter(_build_raw_ptrs);
-
- SCOPED_TIMER(_parent->_build_table_insert_timer);
- hash_table_ctx.hash_table->reset_resize_timer();
-
- // only not build_unique, we need expanse hash table before insert data
- // 1. There are fewer duplicate keys, reducing the number of resize
hash tables
- // can improve performance to a certain extent, about 2%-5%
- // 2. There are many duplicate keys, and the hash table filled bucket
is far less than
- // the hash table build bucket, which may waste a lot of memory.
- // TODO, use the NDV expansion of the key column in the optimizer
statistics
- if (!_parent->build_unique()) {
-
RETURN_IF_CATCH_EXCEPTION(hash_table_ctx.hash_table->expanse_for_add_elem(
- std::min<int>(_rows,
config::hash_table_pre_expanse_max_rows)));
- }
-
- vector<int>& inserted_rows = _parent->_inserted_rows[&_acquired_block];
- bool has_runtime_filter = !_parent->runtime_filter_descs().empty();
- if (has_runtime_filter) {
- inserted_rows.reserve(_batch_size);
}
- hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
- null_map ? null_map->data() :
nullptr);
-
- auto& arena = *_parent->arena();
- auto old_build_arena_memory = arena.size();
-
- size_t k = 0;
- bool inserted = false;
- auto creator = [&](const auto& ctor, auto& key, auto& origin) {
- HashTableContext::try_presis_key(key, origin, arena);
- inserted = true;
- ctor(key, Mapped {k, _offset});
- };
-
- bool build_unique = _parent->build_unique();
-#define EMPLACE_IMPL(stmt) \
- for (; k < _rows; ++k) { \
- if (k % CHECK_FRECUENCY == 0) { \
- RETURN_IF_CANCELLED(_state); \
- } \
- if constexpr (short_circuit_for_null) { \
- if ((*null_map)[k]) { \
- *has_null_key = true; \
- return Status::OK(); \
- } \
- } else if constexpr (ignore_null) { \
- if ((*null_map)[k]) { \
- *has_null_key = true; \
- continue; \
- } \
- } \
- inserted = false; \
- [[maybe_unused]] auto& mapped = \
- hash_table_ctx.lazy_emplace(key_getter, k, creator, nullptr); \
- stmt; \
- }
-
- if (has_runtime_filter && build_unique) {
- EMPLACE_IMPL(
- if (inserted) { inserted_rows.push_back(k); } else {
_skip_rows++; });
- } else if (has_runtime_filter && !build_unique) {
- EMPLACE_IMPL(
- if (inserted) { inserted_rows.push_back(k); } else {
- mapped.insert({k, _offset}, *_parent->arena());
- inserted_rows.push_back(k);
- });
- } else if (!has_runtime_filter && build_unique) {
- EMPLACE_IMPL(if (!inserted) { _skip_rows++; });
- } else {
- EMPLACE_IMPL(if (!inserted) { mapped.insert({k, _offset},
*_parent->arena()); });
+ if (!_parent->runtime_filter_descs().empty()) {
+ _parent->_inserted_blocks.insert(&_acquired_block);
}
- _parent->_build_rf_cardinality += inserted_rows.size();
- _parent->_build_arena_memory_usage->add(arena.size() -
old_build_arena_memory);
+ SCOPED_TIMER(_parent->_build_table_insert_timer);
+ hash_table_ctx.hash_table->template prepare_build<JoinOpType>(_rows,
_state->batch_size());
- COUNTER_UPDATE(_parent->_build_table_expanse_timer,
- hash_table_ctx.hash_table->get_resize_timer_value());
- COUNTER_UPDATE(_parent->_build_table_convert_timer,
- hash_table_ctx.hash_table->get_convert_timer_value());
+ hash_table_ctx.init_serialized_keys(_build_raw_ptrs, _rows,
+ null_map ? null_map->data() :
nullptr, true, true,
+
hash_table_ctx.hash_table->get_bucket_size());
+ hash_table_ctx.hash_table->build(hash_table_ctx.keys,
hash_table_ctx.bucket_nums.data(),
+ _rows);
+ hash_table_ctx.bucket_nums.resize(_state->batch_size());
+ hash_table_ctx.bucket_nums.shrink_to_fit();
return Status::OK();
}
private:
- const int _rows;
- int _skip_rows;
+ const uint32_t _rows;
Block& _acquired_block;
ColumnRawPtrs& _build_raw_ptrs;
Parent* _parent;
int _batch_size;
- uint8_t _offset;
RuntimeState* _state;
ProfileCounter* _build_side_compute_hash_timer;
@@ -325,8 +237,6 @@ using HashTableIteratorVariants =
std::variant<std::monostate, ForwardIterator<RowRefList>,
ForwardIterator<RowRefListWithFlag>,
ForwardIterator<RowRefListWithFlags>>;
-static constexpr auto HASH_JOIN_MAX_BUILD_BLOCK_COUNT = 128;
-
class HashJoinNode final : public VJoinNodeBase {
public:
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
@@ -369,7 +279,7 @@ public:
bool have_other_join_conjunct() const { return _have_other_join_conjunct; }
bool is_right_semi_anti() const { return _is_right_semi_anti; }
bool is_outer_join() const { return _is_outer_join; }
- std::shared_ptr<std::vector<Block>> build_blocks() const { return
_build_blocks; }
+ std::shared_ptr<Block> build_block() const { return _build_block; }
std::vector<bool>* left_output_slot_flags() { return
&_left_output_slot_flags; }
std::vector<bool>* right_output_slot_flags() { return
&_right_output_slot_flags; }
bool* has_null_in_build_side() { return &_has_null_in_build_side; }
@@ -390,16 +300,16 @@ private:
_short_circuit_for_probe =
(_has_null_in_build_side && _join_op ==
TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN &&
!_is_mark_join) ||
- (_build_blocks->empty() && _join_op == TJoinOp::INNER_JOIN &&
!_is_mark_join) ||
- (_build_blocks->empty() && _join_op == TJoinOp::LEFT_SEMI_JOIN
&& !_is_mark_join) ||
- (_build_blocks->empty() && _join_op ==
TJoinOp::RIGHT_OUTER_JOIN) ||
- (_build_blocks->empty() && _join_op ==
TJoinOp::RIGHT_SEMI_JOIN) ||
- (_build_blocks->empty() && _join_op ==
TJoinOp::RIGHT_ANTI_JOIN);
+ (!_build_block && _join_op == TJoinOp::INNER_JOIN &&
!_is_mark_join) ||
+ (!_build_block && _join_op == TJoinOp::LEFT_SEMI_JOIN &&
!_is_mark_join) ||
+ (!_build_block && _join_op == TJoinOp::RIGHT_OUTER_JOIN) ||
+ (!_build_block && _join_op == TJoinOp::RIGHT_SEMI_JOIN) ||
+ (!_build_block && _join_op == TJoinOp::RIGHT_ANTI_JOIN);
//when build table rows is 0 and not have other_join_conjunct and not
_is_mark_join and join type is one of
LEFT_OUTER_JOIN/FULL_OUTER_JOIN/LEFT_ANTI_JOIN
//we could get the result is probe table + null-column(if need output)
_empty_right_table_need_probe_dispose =
- (_build_blocks->empty() && !_have_other_join_conjunct &&
!_is_mark_join) &&
+ (!_build_block && !_have_other_join_conjunct &&
!_is_mark_join) &&
(_join_op == TJoinOp::LEFT_OUTER_JOIN || _join_op ==
TJoinOp::FULL_OUTER_JOIN ||
_join_op == TJoinOp::LEFT_ANTI_JOIN);
}
@@ -467,7 +377,7 @@ private:
HashTableIteratorVariants _outer_join_pull_visited_iter;
HashTableIteratorVariants _probe_row_match_iter;
- std::shared_ptr<std::vector<Block>> _build_blocks;
+ std::shared_ptr<Block> _build_block;
Block _probe_block;
ColumnRawPtrs _probe_columns;
ColumnUInt8::MutablePtr _null_map_column;
@@ -501,7 +411,7 @@ private:
Status _materialize_build_side(RuntimeState* state) override;
- Status _process_build_block(RuntimeState* state, Block& block, uint8_t
offset);
+ Status _process_build_block(RuntimeState* state, Block& block);
Status _do_evaluate(Block& block, VExprContextSPtrs& exprs,
RuntimeProfile::Counter& expr_call_timer,
std::vector<int>& res_col_ids);
@@ -539,7 +449,7 @@ private:
friend struct ProcessRuntimeFilterBuild;
std::vector<TRuntimeFilterDesc> _runtime_filter_descs;
- std::unordered_map<const Block*, std::vector<int>> _inserted_rows;
+ std::unordered_set<const Block*> _inserted_blocks;
std::vector<IRuntimeFilter*> _runtime_filters;
size_t _build_rf_cardinality = 0;
diff --git a/be/src/vec/exec/vset_operation_node.cpp
b/be/src/vec/exec/vset_operation_node.cpp
index 791e92679d0..80990830e53 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -60,7 +60,6 @@
VSetOperationNode<is_intersect>::VSetOperationNode(ObjectPool* pool, const TPlan
: ExecNode(pool, tnode, descs),
_valid_element_in_hash_tbl(0),
_mem_used(0),
- _build_block_index(0),
_build_finished(false) {
_hash_table_variants = std::make_unique<HashTableVariants>();
}
@@ -221,7 +220,7 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
}
return;
}
- if (!try_get_hash_map_context_fixed<PartitionedHashMap, HashCRC32,
RowRefListWithFlags>(
+ if (!try_get_hash_map_context_fixed<JoinFixedHashMap, HashCRC32,
RowRefListWithFlags>(
*_hash_table_variants, _child_expr_lists[0])) {
_hash_table_variants->emplace<SerializedHashTableContext<RowRefListWithFlags>>();
}
@@ -230,36 +229,46 @@ void VSetOperationNode<is_intersect>::hash_table_init() {
template <bool is_intersect>
Status VSetOperationNode<is_intersect>::sink(RuntimeState* state, Block*
block, bool eos) {
SCOPED_TIMER(_exec_timer);
- constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
if (block->rows() != 0) {
_mem_used += block->allocated_bytes();
RETURN_IF_ERROR(_mutable_block.merge(*block));
}
- if (eos || _mutable_block.allocated_bytes() >= BUILD_BLOCK_MAX_SIZE) {
- _build_blocks.emplace_back(_mutable_block.to_block());
- RETURN_IF_ERROR(
- process_build_block(_build_blocks[_build_block_index],
_build_block_index, state));
+ if (block->rows() != 0) {
+ if (_build_block.empty()) {
+
RETURN_IF_ERROR(_mutable_block.merge(*(block->create_same_struct_block(0,
false))));
+ }
+ RETURN_IF_ERROR(_mutable_block.merge(*block));
+ if (_mutable_block.rows() > std::numeric_limits<uint32_t>::max()) {
+ return Status::NotSupported(
+ "Hash join do not support build table rows"
+ " over:" +
+ std::to_string(std::numeric_limits<uint32_t>::max()));
+ }
+ }
+
+ if (eos) {
+ if (!_mutable_block.empty()) {
+ _build_block = _mutable_block.to_block();
+ }
+ RETURN_IF_ERROR(process_build_block(_build_block, state));
_mutable_block.clear();
- ++_build_block_index;
- if (eos) {
- if constexpr (is_intersect) {
- _valid_element_in_hash_tbl = 0;
- } else {
- std::visit(
- [&](auto&& arg) {
- using HashTableCtxType =
std::decay_t<decltype(arg)>;
- if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
- _valid_element_in_hash_tbl =
arg.hash_table->size();
- }
- },
- *_hash_table_variants);
- }
- _build_finished = true;
- _can_read = _children.size() == 1;
+ if constexpr (is_intersect) {
+ _valid_element_in_hash_tbl = 0;
+ } else {
+ std::visit(
+ [&](auto&& arg) {
+ using HashTableCtxType = std::decay_t<decltype(arg)>;
+ if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
+ _valid_element_in_hash_tbl =
arg.hash_table->size();
+ }
+ },
+ *_hash_table_variants);
}
+ _build_finished = true;
+ _can_read = _children.size() == 1;
}
return Status::OK();
}
@@ -312,8 +321,7 @@ Status
VSetOperationNode<is_intersect>::hash_table_build(RuntimeState* state) {
}
template <bool is_intersect>
-Status VSetOperationNode<is_intersect>::process_build_block(Block& block,
uint8_t offset,
- RuntimeState*
state) {
+Status VSetOperationNode<is_intersect>::process_build_block(Block& block,
RuntimeState* state) {
size_t rows = block.rows();
if (rows == 0) {
return Status::OK();
@@ -328,7 +336,7 @@ Status
VSetOperationNode<is_intersect>::process_build_block(Block& block, uint8_
using HashTableCtxType = std::decay_t<decltype(arg)>;
if constexpr (!std::is_same_v<HashTableCtxType,
std::monostate>) {
HashTableBuild<HashTableCtxType, is_intersect>
hash_table_build_process(
- this, rows, raw_ptrs, offset, state);
+ this, rows, raw_ptrs, state);
st = hash_table_build_process(arg, _arena);
} else {
LOG(FATAL) << "FATAL: uninited hash table";
@@ -344,8 +352,8 @@ void
VSetOperationNode<is_intersect>::add_result_columns(RowRefListWithFlags& va
int& block_size) {
auto it = value.begin();
for (auto idx = _build_col_idx.begin(); idx != _build_col_idx.end();
++idx) {
- auto& column =
*_build_blocks[it->block_offset].get_by_position(idx->first).column;
- if (_mutable_cols[idx->second]->is_nullable() xor
column.is_nullable()) {
+ const auto& column = *_build_block.get_by_position(idx->first).column;
+ if (_mutable_cols[idx->second]->is_nullable() ^ column.is_nullable()) {
DCHECK(_mutable_cols[idx->second]->is_nullable());
((ColumnNullable*)(_mutable_cols[idx->second].get()))
->insert_from_not_nullable(column, it->row_num);
@@ -514,10 +522,6 @@ void VSetOperationNode<is_intersect>::debug_string(int
indentation_level,
template <bool is_intersect>
void VSetOperationNode<is_intersect>::release_mem() {
_hash_table_variants = nullptr;
-
- std::vector<Block> tmp_build_blocks;
- _build_blocks.swap(tmp_build_blocks);
-
_probe_block.clear();
}
diff --git a/be/src/vec/exec/vset_operation_node.h
b/be/src/vec/exec/vset_operation_node.h
index ff016469f49..8ca04f2f71f 100644
--- a/be/src/vec/exec/vset_operation_node.h
+++ b/be/src/vec/exec/vset_operation_node.h
@@ -82,7 +82,7 @@ private:
//It's time to abstract out the same methods and provide them directly to
others;
void hash_table_init();
Status hash_table_build(RuntimeState* state);
- Status process_build_block(Block& block, uint8_t offset, RuntimeState*
state);
+ Status process_build_block(Block& block, RuntimeState* state);
Status extract_build_column(Block& block, ColumnRawPtrs& raw_ptrs);
Status extract_probe_column(Block& block, ColumnRawPtrs& raw_ptrs, int
child_id);
void refresh_hash_table();
@@ -115,11 +115,10 @@ private:
//record insert column id during probe
std::vector<uint16_t> _probe_column_inserted_id;
- std::vector<Block> _build_blocks;
+ Block _build_block;
Block _probe_block;
ColumnRawPtrs _probe_columns;
std::vector<MutableColumnPtr> _mutable_cols;
- int _build_block_index;
bool _build_finished;
std::vector<bool> _probe_finished_children_index;
MutableBlock _mutable_block;
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index 6b31cf07ec9..e1c01709042 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -53,18 +53,15 @@ struct SharedRuntimeFilterContext {
struct SharedHashTableContext {
SharedHashTableContext()
- : hash_table_variants(nullptr),
- blocks(new std::vector<vectorized::Block>()),
- signaled(false),
- short_circuit_for_null_in_probe_side(false) {}
+ : hash_table_variants(nullptr),
block(std::make_shared<vectorized::Block>()) {}
Status status;
std::shared_ptr<Arena> arena;
std::shared_ptr<void> hash_table_variants;
- std::shared_ptr<std::vector<Block>> blocks;
+ std::shared_ptr<Block> block;
std::map<int, SharedRuntimeFilterContext> runtime_filters;
- bool signaled;
- bool short_circuit_for_null_in_probe_side;
+ bool signaled {};
+ bool short_circuit_for_null_in_probe_side {};
};
using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]