This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9103ded1dd [improvement](join)optimize sharing hash table for
broadcast join (#14371)
9103ded1dd is described below
commit 9103ded1dd8b482ffc8a9f54b9e90fa82402b3a9
Author: Jerry Hu <[email protected]>
AuthorDate: Thu Nov 24 21:06:44 2022 +0800
[improvement](join)optimize sharing hash table for broadcast join (#14371)
This PR is to make sharing hash table for broadcast more robust:
Add a session variable to enable/disable this function.
Do not block the hash join node's close function.
Use shared pointer to share hash table and runtime filter in broadcast join
nodes.
The Hash join node that doesn't need to build the hash table will close the
right child without reading any data(the child will close the corresponding
sender).
---
be/src/exec/hash_join_node.cpp | 2 +-
be/src/exec/olap_scan_node.cpp | 2 +-
be/src/exprs/runtime_filter.cpp | 169 +++++++-------
be/src/exprs/runtime_filter.h | 4 +-
be/src/exprs/runtime_filter_slots.h | 22 +-
be/src/runtime/fragment_mgr.cpp | 41 +---
be/src/runtime/fragment_mgr.h | 4 -
be/src/runtime/query_fragments_ctx.h | 6 +-
be/src/runtime/runtime_filter_mgr.cpp | 5 +-
be/src/runtime/runtime_filter_mgr.h | 4 +-
be/src/runtime/runtime_state.h | 5 +
.../vec/exec/join/process_hash_table_probe_impl.h | 32 ++-
be/src/vec/exec/join/vhash_join_node.cpp | 260 ++++++++++++---------
be/src/vec/exec/join/vhash_join_node.h | 23 +-
be/src/vec/exec/scan/vscan_node.cpp | 2 +-
.../vec/runtime/shared_hash_table_controller.cpp | 112 +++------
be/src/vec/runtime/shared_hash_table_controller.h | 76 +++---
be/src/vec/runtime/shared_hashtable_controller.cpp | 95 --------
be/src/vec/runtime/shared_hashtable_controller.h | 75 ------
.../java/org/apache/doris/qe/SessionVariable.java | 7 +
gensrc/thrift/PaloInternalService.thrift | 2 +
21 files changed, 363 insertions(+), 585 deletions(-)
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index da0fc10ea7..471801abca 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -92,7 +92,7 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
_runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i],
state->query_options()));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
_runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 60a8c3b2bc..5bf924d1c1 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -88,7 +88,7 @@ Status OlapScanNode::init(const TPlanNode& tnode,
RuntimeState* state) {
for (int i = 0; i < filter_size; ++i) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
- RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
RuntimeFilterRole::CONSUMER, filter_desc,
state->query_options(), id()));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
&runtime_filter));
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index f92869b5f4..c9b3a3e2a1 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -42,6 +42,7 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vliteral.h"
#include "vec/exprs/vruntimefilter_wrapper.h"
+#include "vec/runtime/shared_hash_table_controller.h"
namespace doris {
// PrimitiveType->TExprNodeType
@@ -431,23 +432,25 @@ public:
_max_in_num = params->max_in_num;
switch (_filter_type) {
case RuntimeFilterType::IN_FILTER: {
- _hybrid_set.reset(create_set(_column_return_type,
_state->enable_vectorized_exec()));
+ _context.hybrid_set.reset(
+ create_set(_column_return_type,
_state->enable_vectorized_exec()));
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
- _minmax_func.reset(create_minmax_filter(_column_return_type));
+
_context.minmax_func.reset(create_minmax_filter(_column_return_type));
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
_is_bloomfilter = true;
- _bloomfilter_func.reset(create_bloom_filter(_column_return_type));
- _bloomfilter_func->set_length(params->bloom_filter_size);
+
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+ _context.bloom_filter_func->set_length(params->bloom_filter_size);
return Status::OK();
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
- _hybrid_set.reset(create_set(_column_return_type,
_state->enable_vectorized_exec()));
- _bloomfilter_func.reset(create_bloom_filter(_column_return_type));
- _bloomfilter_func->set_length(params->bloom_filter_size);
+ _context.hybrid_set.reset(
+ create_set(_column_return_type,
_state->enable_vectorized_exec()));
+
_context.bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+ _context.bloom_filter_func->set_length(params->bloom_filter_size);
return Status::OK();
}
default:
@@ -461,14 +464,15 @@ public:
<< "Can not change to bloom filter because of runtime filter
type is "
<< to_string(_filter_type);
_is_bloomfilter = true;
- insert_to_bloom_filter(_bloomfilter_func.get());
+ insert_to_bloom_filter(_context.bloom_filter_func.get());
// release in filter
- _hybrid_set.reset(create_set(_column_return_type,
_state->enable_vectorized_exec()));
+ _context.hybrid_set.reset(
+ create_set(_column_return_type,
_state->enable_vectorized_exec()));
}
void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
- if (_hybrid_set->size() > 0) {
- auto it = _hybrid_set->begin();
+ if (_context.hybrid_set->size() > 0) {
+ auto it = _context.hybrid_set->begin();
if (_use_batch) {
while (it->has_next()) {
@@ -484,7 +488,7 @@ public:
}
}
- BloomFilterFuncBase* get_bloomfilter() const { return
_bloomfilter_func.get(); }
+ BloomFilterFuncBase* get_bloomfilter() const { return
_context.bloom_filter_func.get(); }
void insert(const void* data) {
switch (_filter_type) {
@@ -492,22 +496,22 @@ public:
if (_is_ignored_in_filter) {
break;
}
- _hybrid_set->insert(data);
+ _context.hybrid_set->insert(data);
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
- _minmax_func->insert(data);
+ _context.minmax_func->insert(data);
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
- _bloomfilter_func->insert(data);
+ _context.bloom_filter_func->insert(data);
break;
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
if (_is_bloomfilter) {
- _bloomfilter_func->insert(data);
+ _context.bloom_filter_func->insert(data);
} else {
- _hybrid_set->insert(data);
+ _context.hybrid_set->insert(data);
}
break;
}
@@ -523,22 +527,22 @@ public:
if (_is_ignored_in_filter) {
break;
}
- _hybrid_set->insert_fixed_len(data, offsets, number);
+ _context.hybrid_set->insert_fixed_len(data, offsets, number);
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
- _minmax_func->insert_fixed_len(data, offsets, number);
+ _context.minmax_func->insert_fixed_len(data, offsets, number);
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
- _bloomfilter_func->insert_fixed_len(data, offsets, number);
+ _context.bloom_filter_func->insert_fixed_len(data, offsets,
number);
break;
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
if (_is_bloomfilter) {
- _bloomfilter_func->insert_fixed_len(data, offsets, number);
+ _context.bloom_filter_func->insert_fixed_len(data, offsets,
number);
} else {
- _hybrid_set->insert_fixed_len(data, offsets, number);
+ _context.hybrid_set->insert_fixed_len(data, offsets, number);
}
break;
}
@@ -618,18 +622,18 @@ public:
_is_ignored_in_filter = true;
_ignored_in_filter_msg = wrapper->_ignored_in_filter_msg;
// release in filter
- _hybrid_set.reset(
+ _context.hybrid_set.reset(
create_set(_column_return_type,
_state->enable_vectorized_exec()));
break;
}
// try insert set
- _hybrid_set->insert(wrapper->_hybrid_set.get());
- if (_max_in_num >= 0 && _hybrid_set->size() >= _max_in_num) {
+ _context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
+ if (_max_in_num >= 0 && _context.hybrid_set->size() >=
_max_in_num) {
#ifdef VLOG_DEBUG_IS_ON
std::stringstream msg;
msg << "fragment instance " <<
_fragment_instance_id.to_string()
<< " ignore merge runtime filter(in filter id " <<
_filter_id
- << ") because: in_num(" << _hybrid_set->size() << ") >=
max_in_num("
+ << ") because: in_num(" << _context.hybrid_set->size() <<
") >= max_in_num("
<< _max_in_num << ")";
_ignored_in_filter_msg = _pool->add(new
std::string(msg.str()));
#else
@@ -638,17 +642,17 @@ public:
_is_ignored_in_filter = true;
// release in filter
- _hybrid_set.reset(
+ _context.hybrid_set.reset(
create_set(_column_return_type,
_state->enable_vectorized_exec()));
}
break;
}
case RuntimeFilterType::MINMAX_FILTER: {
- _minmax_func->merge(wrapper->_minmax_func.get(), _pool);
+ _context.minmax_func->merge(wrapper->_context.minmax_func.get(),
_pool);
break;
}
case RuntimeFilterType::BLOOM_FILTER: {
- _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
break;
}
case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
@@ -661,11 +665,11 @@ public:
<< " can not ignore merge runtime filter(in filter
id "
<< wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
<< *(wrapper->get_ignored_in_filter_msg());
- _hybrid_set->insert(wrapper->_hybrid_set.get());
- if (_max_in_num >= 0 && _hybrid_set->size() >=
_max_in_num) {
+
_context.hybrid_set->insert(wrapper->_context.hybrid_set.get());
+ if (_max_in_num >= 0 && _context.hybrid_set->size() >=
_max_in_num) {
VLOG_DEBUG << "fragment instance " <<
_fragment_instance_id.to_string()
<< " change runtime filter to bloom
filter(id=" << _filter_id
- << ") because: in_num(" <<
_hybrid_set->size()
+ << ") because: in_num(" <<
_context.hybrid_set->size()
<< ") >= max_in_num(" << _max_in_num << ")";
change_to_bloom_filter();
}
@@ -675,7 +679,7 @@ public:
<< " change runtime filter to bloom filter(id="
<< _filter_id
<< ") because: already exist a bloom filter";
change_to_bloom_filter();
- _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
}
} else {
if (wrapper->_filter_type ==
@@ -685,10 +689,10 @@ public:
<< " can not ignore merge runtime filter(in filter
id "
<< wrapper->_filter_id << ") when used
IN_OR_BLOOM_FILTER, ignore msg: "
<< *(wrapper->get_ignored_in_filter_msg());
- wrapper->insert_to_bloom_filter(_bloomfilter_func.get());
+
wrapper->insert_to_bloom_filter(_context.bloom_filter_func.get());
// bloom filter merge bloom filter
} else {
- _bloomfilter_func->merge(wrapper->_bloomfilter_func.get());
+
_context.bloom_filter_func->merge(wrapper->_context.bloom_filter_func.get());
}
}
break;
@@ -709,7 +713,7 @@ public:
_ignored_in_filter_msg = _pool->add(new
std::string(in_filter->ignored_msg()));
return Status::OK();
}
- _hybrid_set.reset(create_set(type, _state->enable_vectorized_exec()));
+ _context.hybrid_set.reset(create_set(type,
_state->enable_vectorized_exec()));
switch (type) {
case TYPE_BOOLEAN: {
batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column,
@@ -880,40 +884,40 @@ public:
_is_bloomfilter = true;
// we won't use this class to insert or find any data
// so any type is ok
- _bloomfilter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT));
- return _bloomfilter_func->assign(data, bloom_filter->filter_length());
+
_context.bloom_filter_func.reset(create_bloom_filter(PrimitiveType::TYPE_INT));
+ return _context.bloom_filter_func->assign(data,
bloom_filter->filter_length());
}
// used by shuffle runtime filter
// assign this filter by protobuf
Status assign(const PMinMaxFilter* minmax_filter) {
PrimitiveType type = to_primitive_type(minmax_filter->column_type());
- _minmax_func.reset(create_minmax_filter(type));
+ _context.minmax_func.reset(create_minmax_filter(type));
switch (type) {
case TYPE_BOOLEAN: {
bool min_val = minmax_filter->min_val().boolval();
bool max_val = minmax_filter->max_val().boolval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_TINYINT: {
int8_t min_val =
static_cast<int8_t>(minmax_filter->min_val().intval());
int8_t max_val =
static_cast<int8_t>(minmax_filter->max_val().intval());
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_SMALLINT: {
int16_t min_val =
static_cast<int16_t>(minmax_filter->min_val().intval());
int16_t max_val =
static_cast<int16_t>(minmax_filter->max_val().intval());
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_INT: {
int32_t min_val = minmax_filter->min_val().intval();
int32_t max_val = minmax_filter->max_val().intval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_BIGINT: {
int64_t min_val = minmax_filter->min_val().longval();
int64_t max_val = minmax_filter->max_val().longval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_LARGEINT: {
auto min_string_val = minmax_filter->min_val().stringval();
@@ -925,27 +929,27 @@ public:
int128_t max_val = StringParser::string_to_int<int128_t>(
max_string_val.c_str(), max_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_FLOAT: {
float min_val =
static_cast<float>(minmax_filter->min_val().doubleval());
float max_val =
static_cast<float>(minmax_filter->max_val().doubleval());
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DOUBLE: {
double min_val =
static_cast<double>(minmax_filter->min_val().doubleval());
double max_val =
static_cast<double>(minmax_filter->max_val().doubleval());
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DATEV2: {
int32_t min_val = minmax_filter->min_val().intval();
int32_t max_val = minmax_filter->max_val().intval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DATETIMEV2: {
int64_t min_val = minmax_filter->min_val().longval();
int64_t max_val = minmax_filter->max_val().longval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DATETIME:
case TYPE_DATE: {
@@ -955,24 +959,24 @@ public:
DateTimeValue max_val;
min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DECIMALV2: {
auto& min_val_ref = minmax_filter->min_val().stringval();
auto& max_val_ref = minmax_filter->max_val().stringval();
DecimalV2Value min_val(min_val_ref);
DecimalV2Value max_val(max_val_ref);
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DECIMAL32: {
int32_t min_val = minmax_filter->min_val().intval();
int32_t max_val = minmax_filter->max_val().intval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DECIMAL64: {
int64_t min_val = minmax_filter->min_val().longval();
int64_t max_val = minmax_filter->max_val().longval();
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_DECIMAL128I: {
auto min_string_val = minmax_filter->min_val().stringval();
@@ -984,7 +988,7 @@ public:
int128_t max_val = StringParser::string_to_int<int128_t>(
max_string_val.c_str(), max_string_val.length(), &result);
DCHECK(result == StringParser::PARSE_SUCCESS);
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
case TYPE_VARCHAR:
case TYPE_CHAR:
@@ -995,7 +999,7 @@ public:
auto max_val_ptr = _pool->add(new std::string(max_val_ref));
StringValue min_val(const_cast<char*>(min_val_ptr->c_str()),
min_val_ptr->length());
StringValue max_val(const_cast<char*>(max_val_ptr->c_str()),
max_val_ptr->length());
- return _minmax_func->assign(&min_val, &max_val);
+ return _context.minmax_func->assign(&min_val, &max_val);
}
default:
DCHECK(false) << "unknown type";
@@ -1005,17 +1009,17 @@ public:
}
Status get_in_filter_iterator(HybridSetBase::IteratorBase** it) {
- *it = _hybrid_set->begin();
+ *it = _context.hybrid_set->begin();
return Status::OK();
}
Status get_bloom_filter_desc(char** data, int* filter_length) {
- return _bloomfilter_func->get_data(data, filter_length);
+ return _context.bloom_filter_func->get_data(data, filter_length);
}
Status get_minmax_filter_desc(void** min_data, void** max_data) {
- *min_data = _minmax_func->get_min();
- *max_data = _minmax_func->get_max();
+ *min_data = _context.minmax_func->get_min();
+ *max_data = _context.minmax_func->get_max();
return Status::OK();
}
@@ -1027,13 +1031,13 @@ public:
case TYPE_VARCHAR:
case TYPE_CHAR:
case TYPE_STRING: {
- StringValue* min_value =
static_cast<StringValue*>(_minmax_func->get_min());
- StringValue* max_value =
static_cast<StringValue*>(_minmax_func->get_max());
+ StringValue* min_value =
static_cast<StringValue*>(_context.minmax_func->get_min());
+ StringValue* max_value =
static_cast<StringValue*>(_context.minmax_func->get_max());
auto min_val_ptr = _pool->add(new std::string(min_value->ptr));
auto max_val_ptr = _pool->add(new std::string(max_value->ptr));
StringValue min_val(const_cast<char*>(min_val_ptr->c_str()),
min_val_ptr->length());
StringValue max_val(const_cast<char*>(max_val_ptr->c_str()),
max_val_ptr->length());
- _minmax_func->assign(&min_val, &max_val);
+ _context.minmax_func->assign(&min_val, &max_val);
}
default:
break;
@@ -1052,11 +1056,11 @@ public:
PColumnValue&, ObjectPool*)) {
for (int i = 0; i < filter->values_size(); ++i) {
PColumnValue column = filter->values(i);
- assign_func(_hybrid_set, column, _pool);
+ assign_func(_context.hybrid_set, column, _pool);
}
}
- size_t get_in_filter_size() const { return _hybrid_set->size(); }
+ size_t get_in_filter_size() const { return _context.hybrid_set->size(); }
friend class IRuntimeFilter;
@@ -1068,9 +1072,8 @@ private:
PrimitiveType _column_return_type; // column type
RuntimeFilterType _filter_type;
int32_t _max_in_num = -1;
- std::shared_ptr<MinMaxFuncBase> _minmax_func;
- std::shared_ptr<HybridSetBase> _hybrid_set;
- std::shared_ptr<BloomFilterFuncBase> _bloomfilter_func;
+
+ vectorized::SharedRuntimeFilterContext _context;
bool _is_bloomfilter = false;
bool _is_ignored_in_filter = false;
std::string* _ignored_in_filter_msg = nullptr;
@@ -1090,12 +1093,12 @@ Status IRuntimeFilter::create(RuntimeState* state,
ObjectPool* pool, const TRunt
return (*res)->init_with_desc(desc, query_options, fragment_instance_id,
node_id);
}
-Status IRuntimeFilter::apply_from_other(IRuntimeFilter* other) {
- _wrapper->_hybrid_set = other->_wrapper->_hybrid_set;
- _wrapper->_bloomfilter_func = other->_wrapper->_bloomfilter_func;
- _wrapper->_minmax_func = other->_wrapper->_minmax_func;
- _wrapper->_filter_type = other->_wrapper->_filter_type;
- _runtime_filter_type = other->_runtime_filter_type;
+void
IRuntimeFilter::copy_to_shared_context(vectorized::SharedRuntimeFilterContext&
context) {
+ context = _wrapper->_context;
+}
+
+Status
IRuntimeFilter::copy_from_shared_context(vectorized::SharedRuntimeFilterContext&
context) {
+ _wrapper->_context = context;
return Status::OK();
}
@@ -1736,7 +1739,7 @@ Status RuntimePredicateWrapper::get_push_context(T*
container, RuntimeState* sta
node.__isset.vector_opcode = true;
node.__set_vector_opcode(to_in_opcode(_column_return_type));
auto in_pred = _pool->add(new InPredicate(node));
- RETURN_IF_ERROR(in_pred->prepare(state, _hybrid_set.get()));
+ RETURN_IF_ERROR(in_pred->prepare(state,
_context.hybrid_set.get()));
in_pred->add_child(Expr::copy(_pool, prob_expr->root()));
ExprContext* ctx = _pool->add(new ExprContext(in_pred));
container->push_back(ctx);
@@ -1748,7 +1751,8 @@ Status RuntimePredicateWrapper::get_push_context(T*
container, RuntimeState* sta
Expr* max_literal = nullptr;
auto max_pred = create_bin_predicate(_pool, _column_return_type,
TExprOpcode::LE);
RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
- _minmax_func->get_max(),
(void**)&max_literal));
+ _context.minmax_func->get_max(),
+ (void**)&max_literal));
max_pred->add_child(Expr::copy(_pool, prob_expr->root()));
max_pred->add_child(max_literal);
container->push_back(_pool->add(new ExprContext(max_pred)));
@@ -1756,7 +1760,8 @@ Status RuntimePredicateWrapper::get_push_context(T*
container, RuntimeState* sta
Expr* min_literal = nullptr;
auto min_pred = create_bin_predicate(_pool, _column_return_type,
TExprOpcode::GE);
RETURN_IF_ERROR(create_literal<false>(_pool, prob_expr->root()->type(),
- _minmax_func->get_min(),
(void**)&min_literal));
+ _context.minmax_func->get_min(),
+ (void**)&min_literal));
min_pred->add_child(Expr::copy(_pool, prob_expr->root()));
min_pred->add_child(min_literal);
container->push_back(_pool->add(new ExprContext(min_pred)));
@@ -1772,7 +1777,7 @@ Status RuntimePredicateWrapper::get_push_context(T*
container, RuntimeState* sta
node.__isset.vector_opcode = true;
node.__set_vector_opcode(to_in_opcode(_column_return_type));
auto bloom_pred = _pool->add(new BloomFilterPredicate(node));
- RETURN_IF_ERROR(bloom_pred->prepare(state, _bloomfilter_func));
+ RETURN_IF_ERROR(bloom_pred->prepare(state,
_context.bloom_filter_func));
bloom_pred->add_child(Expr::copy(_pool, prob_expr->root()));
ExprContext* ctx = _pool->add(new ExprContext(bloom_pred));
container->push_back(ctx);
@@ -1811,7 +1816,7 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
node.__set_is_nullable(false);
auto in_pred = _pool->add(new
vectorized::VDirectInPredicate(node));
- in_pred->set_filter(_hybrid_set);
+ in_pred->set_filter(_context.hybrid_set);
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
in_pred->add_child(cloned_vexpr);
auto wrapper = _pool->add(new
vectorized::VRuntimeFilterWrapper(node, in_pred));
@@ -1827,7 +1832,8 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
&max_pred, &max_pred_node));
doris::vectorized::VExpr* max_literal = nullptr;
RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
- _minmax_func->get_max(),
(void**)&max_literal));
+ _context.minmax_func->get_max(),
+ (void**)&max_literal));
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
max_pred->add_child(cloned_vexpr);
max_pred->add_child(max_literal);
@@ -1841,7 +1847,8 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
&min_pred, &min_pred_node));
doris::vectorized::VExpr* min_literal = nullptr;
RETURN_IF_ERROR(create_literal<true>(_pool, vprob_expr->root()->type(),
- _minmax_func->get_min(),
(void**)&min_literal));
+ _context.minmax_func->get_min(),
+ (void**)&min_literal));
cloned_vexpr = vprob_expr->root()->clone(_pool);
min_pred->add_child(cloned_vexpr);
min_pred->add_child(min_literal);
@@ -1861,7 +1868,7 @@ Status
RuntimePredicateWrapper::get_push_vexprs(std::vector<doris::vectorized::V
node.__set_vector_opcode(to_in_opcode(_column_return_type));
node.__set_is_nullable(false);
auto bloom_pred = _pool->add(new vectorized::VBloomPredicate(node));
- bloom_pred->set_filter(_bloomfilter_func);
+ bloom_pred->set_filter(_context.bloom_filter_func);
auto cloned_vexpr = vprob_expr->root()->clone(_pool);
bloom_pred->add_child(cloned_vexpr);
auto wrapper = _pool->add(new vectorized::VRuntimeFilterWrapper(node,
bloom_pred));
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index e84da75a5d..89c82abfef 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -47,6 +47,7 @@ class BloomFilterFuncBase;
namespace vectorized {
class VExpr;
class VExprContext;
+struct SharedRuntimeFilterContext;
} // namespace vectorized
enum class RuntimeFilterType {
@@ -141,7 +142,8 @@ public:
const TQueryOptions* query_options, const
RuntimeFilterRole role,
int node_id, IRuntimeFilter** res);
- Status apply_from_other(IRuntimeFilter* other);
+ void copy_to_shared_context(vectorized::SharedRuntimeFilterContext&
context);
+ Status copy_from_shared_context(vectorized::SharedRuntimeFilterContext&
context);
// insert data to build filter
// only used for producer
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index b13fa69625..c6fa1d2381 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -23,6 +23,7 @@
#include "vec/columns/column_nullable.h"
#include "vec/columns/columns_number.h"
#include "vec/common/assert_cast.h"
+#include "vec/runtime/shared_hash_table_controller.h"
namespace doris {
// this class used in a hash join node
@@ -229,19 +230,24 @@ public:
}
}
- Status apply_from_other(RuntimeFilterSlotsBase<ExprCtxType>* other) {
+ void copy_to_shared_context(vectorized::SharedHashTableContextPtr&
context) {
+ for (auto& it : _runtime_filters) {
+ for (auto& filter : it.second) {
+ auto& target = context->runtime_filters[filter->filter_id()];
+ filter->copy_to_shared_context(target);
+ }
+ }
+ }
+
+ Status copy_from_shared_context(vectorized::SharedHashTableContextPtr&
context) {
for (auto& it : _runtime_filters) {
- auto& other_filters = other->_runtime_filters[it.first];
for (auto& filter : it.second) {
auto filter_id = filter->filter_id();
- auto ret = std::find_if(other_filters.begin(),
other_filters.end(),
- [&](IRuntimeFilter* other_filter) {
- return other_filter->filter_id()
== filter_id;
- });
- if (ret == other_filters.end()) {
+ auto ret = context->runtime_filters.find(filter_id);
+ if (ret == context->runtime_filters.end()) {
return Status::Aborted("invalid runtime filter id: {}",
filter_id);
}
- filter->apply_from_other(*ret);
+ filter->copy_from_shared_context(ret->second);
}
}
return Status::OK();
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 29cdd17320..09ccb4ef23 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -240,13 +240,7 @@ Status FragmentExecState::execute() {
}
#ifndef BE_TEST
if (_executor.runtime_state()->is_cancelled()) {
- Status status = Status::Cancelled("cancelled before execution");
- _executor.runtime_state()
- ->get_query_fragments_ctx()
- ->get_shared_hash_table_controller()
-
->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(),
- status);
- return status;
+ return Status::Cancelled("cancelled before execution");
}
#endif
int64_t duration_ns = 0;
@@ -254,19 +248,11 @@ Status FragmentExecState::execute() {
SCOPED_RAW_TIMER(&duration_ns);
CgroupsMgr::apply_system_cgroup();
opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start
executing Fragment");
- Status status = _executor.open();
- WARN_IF_ERROR(status,
+ WARN_IF_ERROR(_executor.open(),
strings::Substitute("Got error while opening fragment
$0, query id: $1",
print_id(_fragment_instance_id),
print_id(_query_id)));
_executor.close();
- if (!status.ok()) {
- _executor.runtime_state()
- ->get_query_fragments_ctx()
- ->get_shared_hash_table_controller()
-
->release_ref_count_if_need(_executor.runtime_state()->fragment_instance_id(),
- status);
- }
}
DorisMetrics::instance()->fragment_requests_total->increment(1);
DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns /
1000);
@@ -712,9 +698,6 @@ Status FragmentMgr::exec_plan_fragment(const
TExecPlanFragmentParams& params, Fi
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
- _setup_shared_hashtable_for_broadcast_join(params,
exec_state->executor()->runtime_state(),
- fragments_ctx.get());
-
std::shared_ptr<RuntimeFilterMergeControllerEntity> handler;
_runtimefilter_controller.add_entity(params, &handler,
exec_state->executor()->runtime_state());
exec_state->set_merge_controller_handler(handler);
@@ -784,26 +767,6 @@ void FragmentMgr::_set_scan_concurrency(const
TExecPlanFragmentParams& params,
#endif
}
-void FragmentMgr::_setup_shared_hashtable_for_broadcast_join(const
TExecPlanFragmentParams& params,
- RuntimeState*
state,
-
QueryFragmentsCtx* fragments_ctx) {
- if (!params.__isset.fragment || !params.fragment.__isset.plan ||
- params.fragment.plan.nodes.empty()) {
- return;
- }
-
- for (auto& node : params.fragment.plan.nodes) {
- if (node.node_type != TPlanNodeType::HASH_JOIN_NODE ||
- !node.hash_join_node.__isset.is_broadcast_join ||
- !node.hash_join_node.is_broadcast_join) {
- continue;
- }
-
- std::lock_guard<std::mutex> lock(_lock_for_shared_hash_table);
-
fragments_ctx->get_shared_hash_table_controller()->acquire_ref_count(state,
node.node_id);
- }
-}
-
bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) {
return type == TPlanNodeType::OLAP_SCAN_NODE || type ==
TPlanNodeType::MYSQL_SCAN_NODE ||
type == TPlanNodeType::SCHEMA_SCAN_NODE || type ==
TPlanNodeType::META_SCAN_NODE ||
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 7c6a079e32..2246a42ac8 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -107,10 +107,6 @@ private:
bool _is_scan_node(const TPlanNodeType::type& type);
- void _setup_shared_hashtable_for_broadcast_join(const
TExecPlanFragmentParams& params,
- RuntimeState* state,
- QueryFragmentsCtx*
fragments_ctx);
-
// This is input params
ExecEnv* _exec_env;
diff --git a/be/src/runtime/query_fragments_ctx.h
b/be/src/runtime/query_fragments_ctx.h
index 1f7c053db1..4bfff8c4da 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -100,8 +100,8 @@ public:
return _ready_to_execute.load() && !_is_cancelled.load();
}
- vectorized::SharedHashTableController* get_shared_hash_table_controller() {
- return _shared_hash_table_controller.get();
+ std::shared_ptr<vectorized::SharedHashTableController>
get_shared_hash_table_controller() {
+ return _shared_hash_table_controller;
}
public:
@@ -144,7 +144,7 @@ private:
std::atomic<bool> _ready_to_execute {false};
std::atomic<bool> _is_cancelled {false};
- std::unique_ptr<vectorized::SharedHashTableController>
_shared_hash_table_controller;
+ std::shared_ptr<vectorized::SharedHashTableController>
_shared_hash_table_controller;
};
} // namespace doris
diff --git a/be/src/runtime/runtime_filter_mgr.cpp
b/be/src/runtime/runtime_filter_mgr.cpp
index 7bc894e1da..ce851c119a 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -78,8 +78,9 @@ Status RuntimeFilterMgr::get_producer_filter(const int
filter_id,
return get_filter_by_role(filter_id, RuntimeFilterRole::PRODUCER,
producer_filter);
}
-Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const
TRuntimeFilterDesc& desc,
- const TQueryOptions& options, int
node_id) {
+Status RuntimeFilterMgr::register_filter(const RuntimeFilterRole role,
+ const TRuntimeFilterDesc& desc,
+ const TQueryOptions& options, int
node_id) {
DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
role != RuntimeFilterRole::CONSUMER);
SCOPED_CONSUME_MEM_TRACKER(_tracker.get());
diff --git a/be/src/runtime/runtime_filter_mgr.h
b/be/src/runtime/runtime_filter_mgr.h
index db392bcc54..002f85c20b 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -69,8 +69,8 @@ public:
Status get_producer_filter(const int filter_id, IRuntimeFilter**
producer_filter);
// regist filter
- Status regist_filter(const RuntimeFilterRole role, const
TRuntimeFilterDesc& desc,
- const TQueryOptions& options, int node_id = -1);
+ Status register_filter(const RuntimeFilterRole role, const
TRuntimeFilterDesc& desc,
+ const TQueryOptions& options, int node_id = -1);
// update filter by remote
Status update_filter(const PPublishFilterRequest* request,
diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h
index cc9d8a4831..7b8ae4d89d 100644
--- a/be/src/runtime/runtime_state.h
+++ b/be/src/runtime/runtime_state.h
@@ -397,6 +397,11 @@ public:
bool enable_profile() const { return _query_options.is_report_success; }
+ bool enable_share_hash_table_for_broadcast_join() const {
+ return
_query_options.__isset.enable_share_hash_table_for_broadcast_join &&
+ _query_options.enable_share_hash_table_for_broadcast_join;
+ }
+
private:
// Use a custom block manager for the query for testing purposes.
void set_block_mgr2(const std::shared_ptr<BufferedBlockMgr2>& block_mgr) {
diff --git a/be/src/vec/exec/join/process_hash_table_probe_impl.h
b/be/src/vec/exec/join/process_hash_table_probe_impl.h
index 7da2100784..9367c299c3 100644
--- a/be/src/vec/exec/join/process_hash_table_probe_impl.h
+++ b/be/src/vec/exec/join/process_hash_table_probe_impl.h
@@ -28,7 +28,7 @@ template <int JoinOpType>
ProcessHashTableProbe<JoinOpType>::ProcessHashTableProbe(HashJoinNode*
join_node, int batch_size)
: _join_node(join_node),
_batch_size(batch_size),
- _build_blocks(join_node->_build_blocks),
+ _build_blocks(*join_node->_build_blocks),
_tuple_is_null_left_flags(join_node->_is_outer_join
? &(reinterpret_cast<ColumnUInt8&>(
*join_node->_tuple_is_null_left_flag_column)
@@ -207,15 +207,14 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process(HashTableType& hash_table_c
}
int last_offset = current_offset;
auto find_result =
- !need_null_map_for_probe ?
key_getter.find_key(*hash_table_ctx.hash_table_ptr,
-
probe_index, _arena)
+ !need_null_map_for_probe
+ ? key_getter.find_key(hash_table_ctx.hash_table,
probe_index, _arena)
: (*null_map)[probe_index]
- ?
decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr,
- probe_index,
_arena)) {nullptr, false}
- :
key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index,
- _arena);
+ ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
+ _arena)) {nullptr,
false}
+ : key_getter.find_key(hash_table_ctx.hash_table,
probe_index, _arena);
if (probe_index + PREFETCH_STEP < probe_rows)
- key_getter.template
prefetch<true>(*hash_table_ctx.hash_table_ptr,
+ key_getter.template prefetch<true>(hash_table_ctx.hash_table,
probe_index +
PREFETCH_STEP, _arena);
if constexpr (JoinOpType == TJoinOp::LEFT_ANTI_JOIN ||
@@ -372,15 +371,14 @@ Status
ProcessHashTableProbe<JoinOpType>::do_process_with_other_join_conjuncts(
auto last_offset = current_offset;
auto find_result =
- !need_null_map_for_probe ?
key_getter.find_key(*hash_table_ctx.hash_table_ptr,
-
probe_index, _arena)
+ !need_null_map_for_probe
+ ? key_getter.find_key(hash_table_ctx.hash_table,
probe_index, _arena)
: (*null_map)[probe_index]
- ?
decltype(key_getter.find_key(*hash_table_ctx.hash_table_ptr,
- probe_index,
_arena)) {nullptr, false}
- :
key_getter.find_key(*hash_table_ctx.hash_table_ptr, probe_index,
- _arena);
+ ?
decltype(key_getter.find_key(hash_table_ctx.hash_table, probe_index,
+ _arena)) {nullptr,
false}
+ : key_getter.find_key(hash_table_ctx.hash_table,
probe_index, _arena);
if (probe_index + PREFETCH_STEP < probe_rows)
- key_getter.template
prefetch<true>(*hash_table_ctx.hash_table_ptr,
+ key_getter.template prefetch<true>(hash_table_ctx.hash_table,
probe_index +
PREFETCH_STEP, _arena);
if (find_result.is_found()) {
auto& mapped = find_result.get_mapped();
@@ -627,7 +625,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
};
- for (; iter != hash_table_ctx.hash_table_ptr->end() && block_size <
_batch_size; ++iter) {
+ for (; iter != hash_table_ctx.hash_table.end() && block_size <
_batch_size; ++iter) {
auto& mapped = iter->get_second();
if constexpr (std::is_same_v<Mapped, RowRefListWithFlag>) {
if (mapped.visited) {
@@ -674,7 +672,7 @@ Status
ProcessHashTableProbe<JoinOpType>::process_data_in_hashtable(HashTableTyp
}
_tuple_is_null_left_flags->resize_fill(block_size, 1);
}
- *eos = iter == hash_table_ctx.hash_table_ptr->end();
+ *eos = iter == hash_table_ctx.hash_table.end();
output_block->swap(
mutable_block.to_block(right_semi_anti_without_other ?
right_col_idx : 0));
return Status::OK();
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp
b/be/src/vec/exec/join/vhash_join_node.cpp
index da144378b1..3778b3688f 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -221,7 +221,7 @@ struct ProcessRuntimeFilterBuild {
_join_node->_runtime_filter_descs));
RETURN_IF_ERROR(_join_node->_runtime_filter_slots->init(
- state, hash_table_ctx.hash_table_ptr->get_size()));
+ state, hash_table_ctx.hash_table.get_size()));
if (!_join_node->_runtime_filter_slots->empty() &&
!_join_node->_inserted_rows.empty()) {
{
@@ -250,14 +250,15 @@ HashJoinNode::HashJoinNode(ObjectPool* pool, const
TPlanNode& tnode, const Descr
?
tnode.hash_join_node.hash_output_slot_ids
: std::vector<SlotId> {}) {
_runtime_filter_descs = tnode.runtime_filters;
- _arena = std::make_unique<Arena>();
- _hash_table_variants = std::make_unique<HashTableVariants>();
+ _arena = std::make_shared<Arena>();
+ _hash_table_variants = std::make_shared<HashTableVariants>();
_process_hashtable_ctx_variants = std::make_unique<HashTableCtxVariants>();
+ _build_blocks.reset(new std::vector<Block>());
// avoid vector expand change block address.
// one block can store 4g data, _build_blocks can store 128*4g data.
// if probe data bigger than 512g, runtime filter maybe will core dump
when insert data.
- _build_blocks.reserve(_MAX_BUILD_BLOCK_COUNT);
+ _build_blocks->reserve(_MAX_BUILD_BLOCK_COUNT);
}
Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
@@ -313,7 +314,7 @@ Status HashJoinNode::init(const TPlanNode& tnode,
RuntimeState* state) {
_runtime_filters.resize(_runtime_filter_descs.size());
for (size_t i = 0; i < _runtime_filter_descs.size(); i++) {
- RETURN_IF_ERROR(state->runtime_filter_mgr()->regist_filter(
+ RETURN_IF_ERROR(state->runtime_filter_mgr()->register_filter(
RuntimeFilterRole::PRODUCER, _runtime_filter_descs[i],
state->query_options()));
RETURN_IF_ERROR(state->runtime_filter_mgr()->get_producer_filter(
_runtime_filter_descs[i].filter_id, &_runtime_filters[i]));
@@ -349,18 +350,18 @@ Status HashJoinNode::prepare(RuntimeState* state) {
RETURN_IF_ERROR(VJoinNodeBase::prepare(state));
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
// Build phase
- auto build_phase_profile = runtime_profile()->create_child("BuildPhase",
true, true);
- runtime_profile()->add_child(build_phase_profile, false, nullptr);
- _build_timer = ADD_TIMER(build_phase_profile, "BuildTime");
- _build_table_timer = ADD_TIMER(build_phase_profile, "BuildTableTime");
- _build_side_merge_block_timer = ADD_TIMER(build_phase_profile,
"BuildSideMergeBlockTime");
- _build_table_insert_timer = ADD_TIMER(build_phase_profile,
"BuildTableInsertTime");
- _build_expr_call_timer = ADD_TIMER(build_phase_profile,
"BuildExprCallTime");
- _build_table_expanse_timer = ADD_TIMER(build_phase_profile,
"BuildTableExpanseTime");
+ _build_phase_profile = runtime_profile()->create_child("BuildPhase", true,
true);
+ runtime_profile()->add_child(_build_phase_profile, false, nullptr);
+ _build_timer = ADD_TIMER(_build_phase_profile, "BuildTime");
+ _build_table_timer = ADD_TIMER(_build_phase_profile, "BuildTableTime");
+ _build_side_merge_block_timer = ADD_TIMER(_build_phase_profile,
"BuildSideMergeBlockTime");
+ _build_table_insert_timer = ADD_TIMER(_build_phase_profile,
"BuildTableInsertTime");
+ _build_expr_call_timer = ADD_TIMER(_build_phase_profile,
"BuildExprCallTime");
+ _build_table_expanse_timer = ADD_TIMER(_build_phase_profile,
"BuildTableExpanseTime");
_build_table_convert_timer =
- ADD_TIMER(build_phase_profile,
"BuildTableConvertToPartitionedTime");
- _build_rows_counter = ADD_COUNTER(build_phase_profile, "BuildRows",
TUnit::UNIT);
- _build_side_compute_hash_timer = ADD_TIMER(build_phase_profile,
"BuildSideHashComputingTime");
+ ADD_TIMER(_build_phase_profile,
"BuildTableConvertToPartitionedTime");
+ _build_rows_counter = ADD_COUNTER(_build_phase_profile, "BuildRows",
TUnit::UNIT);
+ _build_side_compute_hash_timer = ADD_TIMER(_build_phase_profile,
"BuildSideHashComputingTime");
// Probe phase
auto probe_phase_profile = runtime_profile()->create_child("ProbePhase",
true, true);
@@ -379,8 +380,19 @@ Status HashJoinNode::prepare(RuntimeState* state) {
_build_buckets_counter = ADD_COUNTER(runtime_profile(), "BuildBuckets",
TUnit::UNIT);
_build_buckets_fill_counter = ADD_COUNTER(runtime_profile(),
"FilledBuckets", TUnit::UNIT);
+ _should_build_hash_table = true;
if (_is_broadcast_join) {
runtime_profile()->add_info_string("BroadcastJoin", "true");
+ if (state->enable_share_hash_table_for_broadcast_join()) {
+ runtime_profile()->add_info_string("ShareHashTableEnabled",
"true");
+ _shared_hashtable_controller =
+
state->get_query_fragments_ctx()->get_shared_hash_table_controller();
+ _shared_hash_table_context =
_shared_hashtable_controller->get_context(id());
+ _should_build_hash_table =
_shared_hashtable_controller->should_build_hash_table(
+ state->fragment_instance_id(), id());
+ } else {
+ runtime_profile()->add_info_string("ShareHashTableEnabled",
"false");
+ }
}
RETURN_IF_ERROR(VExpr::prepare(_build_expr_ctxs, state,
child(1)->row_desc()));
@@ -398,7 +410,6 @@ Status HashJoinNode::prepare(RuntimeState* state) {
// Hash Table Init
_hash_table_init(state);
- _process_hashtable_ctx_variants_init(state);
_construct_mutable_join_block();
return Status::OK();
@@ -417,11 +428,6 @@ Status HashJoinNode::close(RuntimeState* state) {
return Status::OK();
}
- if (_shared_hashtable_controller) {
- _shared_hashtable_controller->release_ref_count(state, id());
- _shared_hashtable_controller->wait_for_closable(state, id());
- }
-
START_AND_SCOPE_SPAN(state->get_tracer(), span, "HashJoinNode::close");
VExpr::close(_build_expr_ctxs, state);
VExpr::close(_probe_expr_ctxs, state);
@@ -610,6 +616,7 @@ Status HashJoinNode::open(RuntimeState* state) {
Status HashJoinNode::_materialize_build_side(RuntimeState* state) {
RETURN_IF_ERROR(child(1)->open(state));
+
SCOPED_TIMER(_build_timer);
MutableBlock mutable_block(child(1)->row_desc().tuple_descriptors());
@@ -620,111 +627,128 @@ Status
HashJoinNode::_materialize_build_side(RuntimeState* state) {
// make one block for each 4 gigabytes
constexpr static auto BUILD_BLOCK_MAX_SIZE = 4 * 1024UL * 1024UL * 1024UL;
- auto should_build_hash_table = true;
- if (_is_broadcast_join) {
- _shared_hashtable_controller =
-
state->get_query_fragments_ctx()->get_shared_hash_table_controller();
- should_build_hash_table =
- _shared_hashtable_controller->should_build_hash_table(state,
id());
- }
+ if (_should_build_hash_table) {
+ Block block;
+ // If eos or have already met a null value using short-circuit
strategy, we do not need to pull
+ // data from data.
+ while (!eos && !_short_circuit_for_null_in_probe_side) {
+ block.clear_column_data();
+ RETURN_IF_CANCELLED(state);
- Block block;
- // If eos or have already met a null value using short-circuit strategy,
we do not need to pull
- // data from data.
- while (!eos && !_short_circuit_for_null_in_probe_side) {
- block.clear_column_data();
- RETURN_IF_CANCELLED(state);
-
-
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block,
&eos),
- child(1)->get_next_span(), eos);
- if (!should_build_hash_table) {
- continue;
- }
+
RETURN_IF_ERROR_AND_CHECK_SPAN(child(1)->get_next_after_projects(state, &block,
&eos),
+ child(1)->get_next_span(), eos);
- _mem_used += block.allocated_bytes();
+ _mem_used += block.allocated_bytes();
+
+ if (block.rows() != 0) {
+ SCOPED_TIMER(_build_side_merge_block_timer);
+ RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block));
+ }
- if (block.rows() != 0) {
- SCOPED_TIMER(_build_side_merge_block_timer);
- RETURN_IF_CATCH_BAD_ALLOC(mutable_block.merge(block));
+ if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) {
+ if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) {
+ return Status::NotSupported(
+ strings::Substitute("data size of right table in
hash join > $0",
+ BUILD_BLOCK_MAX_SIZE *
_MAX_BUILD_BLOCK_COUNT));
+ }
+ _build_blocks->emplace_back(mutable_block.to_block());
+ // TODO:: Rethink may we should do the process after we
receive all build blocks ?
+ // which is better.
+ RETURN_IF_ERROR(_process_build_block(state,
(*_build_blocks)[index], index));
+
+ mutable_block = MutableBlock();
+ ++index;
+ last_mem_used = _mem_used;
+ }
}
- if (UNLIKELY(_mem_used - last_mem_used > BUILD_BLOCK_MAX_SIZE)) {
- if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
+ if (!mutable_block.empty() && !_short_circuit_for_null_in_probe_side) {
+ if (_build_blocks->size() == _MAX_BUILD_BLOCK_COUNT) {
return Status::NotSupported(
strings::Substitute("data size of right table in hash
join > $0",
BUILD_BLOCK_MAX_SIZE *
_MAX_BUILD_BLOCK_COUNT));
}
- _build_blocks.emplace_back(mutable_block.to_block());
- // TODO:: Rethink may we should do the process after we receive
all build blocks ?
- // which is better.
- RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index],
index));
-
- mutable_block = MutableBlock();
- ++index;
- last_mem_used = _mem_used;
+ _build_blocks->emplace_back(mutable_block.to_block());
+ RETURN_IF_ERROR(_process_build_block(state,
(*_build_blocks)[index], index));
}
}
+ child(1)->close(state);
- if (should_build_hash_table && !mutable_block.empty() &&
- !_short_circuit_for_null_in_probe_side) {
- if (_build_blocks.size() == _MAX_BUILD_BLOCK_COUNT) {
- return Status::NotSupported(
- strings::Substitute("data size of right table in hash join
> $0",
- BUILD_BLOCK_MAX_SIZE *
_MAX_BUILD_BLOCK_COUNT));
+ if (_should_build_hash_table) {
+ auto ret = std::visit(Overload {[&](std::monostate&) -> Status {
+ LOG(FATAL) << "FATAL: uninited
hash table";
+ __builtin_unreachable();
+ },
+ [&](auto&& arg) -> Status {
+ using HashTableCtxType =
std::decay_t<decltype(arg)>;
+
ProcessRuntimeFilterBuild<HashTableCtxType>
+
runtime_filter_build_process(this);
+ return
runtime_filter_build_process(state, arg);
+ }},
+ *_hash_table_variants);
+ if (!ret.ok()) {
+ if (_shared_hashtable_controller) {
+ _shared_hash_table_context->status = ret;
+ _shared_hashtable_controller->signal(id());
+ }
+ return ret;
+ }
+ if (_shared_hashtable_controller) {
+ _shared_hash_table_context->status = Status::OK();
+ // arena will be shared with other instances.
+ _shared_hash_table_context->arena = _arena;
+ _shared_hash_table_context->blocks = _build_blocks;
+ _shared_hash_table_context->hash_table_variants =
_hash_table_variants;
+ _shared_hash_table_context->short_circuit_for_null_in_probe_side =
+ _short_circuit_for_null_in_probe_side;
+ if (_runtime_filter_slots) {
+
_runtime_filter_slots->copy_to_shared_context(_shared_hash_table_context);
+ }
+ _shared_hashtable_controller->signal(id());
+ }
+ } else {
+ DCHECK(_shared_hashtable_controller != nullptr);
+ DCHECK(_shared_hash_table_context != nullptr);
+ auto wait_timer = ADD_TIMER(_build_phase_profile,
"WaitForSharedHashTableTime");
+ SCOPED_TIMER(wait_timer);
+ RETURN_IF_ERROR(
+ _shared_hashtable_controller->wait_for_signal(state,
_shared_hash_table_context));
+
+ _build_phase_profile->add_info_string(
+ "SharedHashTableFrom",
+
print_id(_shared_hashtable_controller->get_builder_fragment_instance_id(id())));
+ _short_circuit_for_null_in_probe_side =
+
_shared_hash_table_context->short_circuit_for_null_in_probe_side;
+ _hash_table_variants = std::static_pointer_cast<HashTableVariants>(
+ _shared_hash_table_context->hash_table_variants);
+ _build_blocks = _shared_hash_table_context->blocks;
+
+ if (!_shared_hash_table_context->runtime_filters.empty()) {
+ auto ret = std::visit(
+ Overload {[&](std::monostate&) -> Status {
+ LOG(FATAL) << "FATAL: uninited hash table";
+ __builtin_unreachable();
+ },
+ [&](auto&& arg) -> Status {
+ if (_runtime_filter_descs.empty()) {
+ return Status::OK();
+ }
+ _runtime_filter_slots = _pool->add(new
VRuntimeFilterSlots(
+ _probe_expr_ctxs, _build_expr_ctxs,
+ _runtime_filter_descs));
+
+ RETURN_IF_ERROR(_runtime_filter_slots->init(
+ state, arg.hash_table.get_size()));
+ return
_runtime_filter_slots->copy_from_shared_context(
+ _shared_hash_table_context);
+ }},
+ *_hash_table_variants);
+ RETURN_IF_ERROR(ret);
}
- _build_blocks.emplace_back(mutable_block.to_block());
- RETURN_IF_ERROR(_process_build_block(state, _build_blocks[index],
index));
}
- child(1)->close(state);
- return std::visit(
- Overload {[&](std::monostate& arg) -> Status {
- LOG(FATAL) << "FATAL: uninited hash table";
- __builtin_unreachable();
- },
- [&](auto&& arg) -> Status {
- using HashTableCtxType = std::decay_t<decltype(arg)>;
- using HashTableType = typename
HashTableCtxType::HashTable;
- if (!should_build_hash_table) {
- auto& ret =
_shared_hashtable_controller->wait_for_hash_table(id());
- if (!ret.status.ok()) {
- return ret.status;
- }
- _short_circuit_for_null_in_probe_side =
- _shared_hashtable_controller
-
->short_circuit_for_null_in_probe_side();
- arg.hash_table_ptr =
-
reinterpret_cast<HashTableType*>(ret.hash_table_ptr);
- _build_blocks = *ret.blocks;
- _runtime_filter_slots = _pool->add(new
VRuntimeFilterSlots(
- _probe_expr_ctxs, _build_expr_ctxs,
_runtime_filter_descs));
- RETURN_IF_ERROR(_runtime_filter_slots->init(
- state, arg.hash_table_ptr->get_size()));
-
RETURN_IF_ERROR(_runtime_filter_slots->apply_from_other(
- ret.runtime_filter_slots));
- {
- SCOPED_TIMER(_push_down_timer);
- _runtime_filter_slots->publish();
- }
- return Status::OK();
- } else {
- arg.hash_table_ptr = &arg.hash_table;
- ProcessRuntimeFilterBuild<HashTableCtxType>
- runtime_filter_build_process(this);
- auto ret = runtime_filter_build_process(state,
arg);
- if (_shared_hashtable_controller) {
- _shared_hashtable_controller
-
->set_short_circuit_for_null_in_probe_side(
-
_short_circuit_for_null_in_probe_side);
- SharedHashTableEntry entry(ret,
arg.hash_table_ptr,
- &_build_blocks,
_runtime_filter_slots);
-
_shared_hashtable_controller->put_hash_table(std::move(entry),
-
id());
- }
- return ret;
- }
- }},
- *_hash_table_variants);
+ _process_hashtable_ctx_variants_init(state);
+ return Status::OK();
}
template <bool BuildSide>
@@ -993,7 +1017,7 @@ void HashJoinNode::_hash_table_init(RuntimeState* state) {
__builtin_unreachable();
},
[&](auto&& arg) {
- arg.hash_table_ptr->set_partitioned_threshold(
+ arg.hash_table.set_partitioned_threshold(
state->partitioned_hash_join_rows_threshold());
}},
*_hash_table_variants);
@@ -1054,6 +1078,12 @@ void HashJoinNode::_reset_tuple_is_null_column() {
}
}
+HashJoinNode::~HashJoinNode() {
+ if (_shared_hashtable_controller && _should_build_hash_table) {
+ _shared_hashtable_controller->signal(id());
+ }
+}
+
void HashJoinNode::_release_mem() {
_arena = nullptr;
_hash_table_variants = nullptr;
@@ -1061,10 +1091,8 @@ void HashJoinNode::_release_mem() {
_null_map_column = nullptr;
_tuple_is_null_left_flag_column = nullptr;
_tuple_is_null_right_flag_column = nullptr;
+ _shared_hash_table_context = nullptr;
_probe_block.clear();
-
- std::vector<Block> tmp_build_blocks;
- _build_blocks.swap(tmp_build_blocks);
}
} // namespace doris::vectorized
diff --git a/be/src/vec/exec/join/vhash_join_node.h
b/be/src/vec/exec/join/vhash_join_node.h
index 72d8317468..90c5e7e3cd 100644
--- a/be/src/vec/exec/join/vhash_join_node.h
+++ b/be/src/vec/exec/join/vhash_join_node.h
@@ -24,7 +24,9 @@
#include "join_op.h"
#include "process_hash_table_probe.h"
#include "vec/common/columns_hashing.h"
+#include "vec/common/hash_table/hash_map.h"
#include "vec/common/hash_table/partitioned_hash_map.h"
+#include "vec/runtime/shared_hash_table_controller.h"
#include "vjoin_node_base.h"
namespace doris {
@@ -44,7 +46,6 @@ struct SerializedHashTableContext {
using Iter = typename HashTable::iterator;
HashTable hash_table;
- HashTable* hash_table_ptr = &hash_table;
Iter iter;
bool inited = false;
@@ -76,7 +77,6 @@ struct PrimaryTypeHashTableContext {
using Iter = typename HashTable::iterator;
HashTable hash_table;
- HashTable* hash_table_ptr = &hash_table;
Iter iter;
bool inited = false;
@@ -111,7 +111,6 @@ struct FixedKeyHashTableContext {
using Iter = typename HashTable::iterator;
HashTable hash_table;
- HashTable* hash_table_ptr = &hash_table;
Iter iter;
bool inited = false;
@@ -185,6 +184,7 @@ public:
static constexpr int PREFETCH_STEP = 64;
HashJoinNode(ObjectPool* pool, const TPlanNode& tnode, const
DescriptorTbl& descs);
+ ~HashJoinNode();
Status init(const TPlanNode& tnode, RuntimeState* state = nullptr)
override;
Status prepare(RuntimeState* state) override;
@@ -235,14 +235,18 @@ private:
RuntimeProfile::Counter* _join_filter_timer;
+ RuntimeProfile* _build_phase_profile;
+
int64_t _mem_used;
- std::unique_ptr<Arena> _arena;
- std::unique_ptr<HashTableVariants> _hash_table_variants;
+ std::shared_ptr<Arena> _arena;
+
+ // maybe share hash table with other fragment instances
+ std::shared_ptr<HashTableVariants> _hash_table_variants;
std::unique_ptr<HashTableCtxVariants> _process_hashtable_ctx_variants;
- std::vector<Block> _build_blocks;
+ std::shared_ptr<std::vector<Block>> _build_blocks;
Block _probe_block;
ColumnRawPtrs _probe_columns;
ColumnUInt8::MutablePtr _null_map_column;
@@ -259,8 +263,9 @@ private:
Sizes _build_key_sz;
bool _is_broadcast_join = false;
- SharedHashTableController* _shared_hashtable_controller = nullptr;
- VRuntimeFilterSlots* _runtime_filter_slots;
+ bool _should_build_hash_table = true;
+ std::shared_ptr<SharedHashTableController> _shared_hashtable_controller =
nullptr;
+ VRuntimeFilterSlots* _runtime_filter_slots = nullptr;
std::vector<SlotId> _hash_output_slot_ids;
std::vector<bool> _left_output_slot_flags;
@@ -269,6 +274,8 @@ private:
MutableColumnPtr _tuple_is_null_left_flag_column;
MutableColumnPtr _tuple_is_null_right_flag_column;
+ SharedHashTableContextPtr _shared_hash_table_context = nullptr;
+
private:
Status _materialize_build_side(RuntimeState* state) override;
diff --git a/be/src/vec/exec/scan/vscan_node.cpp
b/be/src/vec/exec/scan/vscan_node.cpp
index 35c6186ad7..33c6fab3fe 100644
--- a/be/src/vec/exec/scan/vscan_node.cpp
+++ b/be/src/vec/exec/scan/vscan_node.cpp
@@ -189,7 +189,7 @@ Status VScanNode::_register_runtime_filter() {
for (int i = 0; i < filter_size; ++i) {
IRuntimeFilter* runtime_filter = nullptr;
const auto& filter_desc = _runtime_filter_descs[i];
- RETURN_IF_ERROR(_state->runtime_filter_mgr()->regist_filter(
+ RETURN_IF_ERROR(_state->runtime_filter_mgr()->register_filter(
RuntimeFilterRole::CONSUMER, filter_desc,
_state->query_options(), id()));
RETURN_IF_ERROR(_state->runtime_filter_mgr()->get_consume_filter(filter_desc.filter_id,
&runtime_filter));
diff --git a/be/src/vec/runtime/shared_hash_table_controller.cpp
b/be/src/vec/runtime/shared_hash_table_controller.cpp
index 7d92dabedf..e9e125a168 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.cpp
+++ b/be/src/vec/runtime/shared_hash_table_controller.cpp
@@ -22,111 +22,57 @@
namespace doris {
namespace vectorized {
-bool SharedHashTableController::should_build_hash_table(RuntimeState* state,
int my_node_id) {
+bool SharedHashTableController::should_build_hash_table(const TUniqueId&
fragment_instance_id,
+ int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
auto it = _builder_fragment_ids.find(my_node_id);
if (it == _builder_fragment_ids.cend()) {
- _builder_fragment_ids[my_node_id] = state->fragment_instance_id();
+ _builder_fragment_ids.insert({my_node_id, fragment_instance_id});
return true;
}
return false;
}
-bool SharedHashTableController::supposed_to_build_hash_table(RuntimeState*
state, int my_node_id) {
+SharedHashTableContextPtr SharedHashTableController::get_context(int
my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
- auto it = _builder_fragment_ids.find(my_node_id);
- if (it != _builder_fragment_ids.cend()) {
- return _builder_fragment_ids[my_node_id] ==
state->fragment_instance_id();
+ auto it = _shared_contexts.find(my_node_id);
+ if (it == _shared_contexts.cend()) {
+ _shared_contexts.insert({my_node_id,
std::make_shared<SharedHashTableContext>()});
}
- return false;
+ return _shared_contexts[my_node_id];
}
-void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry,
int my_node_id) {
+void SharedHashTableController::signal(int my_node_id) {
std::lock_guard<std::mutex> lock(_mutex);
- DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend());
- _hash_table_entries.insert({my_node_id, std::move(entry)});
- _cv.notify_all();
-}
-
-SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _hash_table_entries.find(my_node_id);
- if (it == _hash_table_entries.cend()) {
- _cv.wait(lock, [this, &it, my_node_id]() {
- it = _hash_table_entries.find(my_node_id);
- return it != _hash_table_entries.cend();
- });
+ auto it = _shared_contexts.find(my_node_id);
+ if (it != _shared_contexts.cend()) {
+ it->second->signaled = true;
+ _shared_contexts.erase(it);
}
- return it->second;
-}
-
-void SharedHashTableController::acquire_ref_count(RuntimeState* state, int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- _ref_fragments[my_node_id].emplace_back(state->fragment_instance_id());
-}
-
-Status SharedHashTableController::release_ref_count(RuntimeState* state, int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- auto id = state->fragment_instance_id();
- auto it = std::find(_ref_fragments[my_node_id].begin(),
_ref_fragments[my_node_id].end(), id);
- CHECK(it != _ref_fragments[my_node_id].end());
- _ref_fragments[my_node_id].erase(it);
- _put_an_empty_entry_if_need(Status::Cancelled("hash table not build"), id,
my_node_id);
_cv.notify_all();
- return Status::OK();
}
-void SharedHashTableController::_put_an_empty_entry_if_need(Status status,
TUniqueId fragment_id,
- int node_id) {
- auto builder_it = _builder_fragment_ids.find(node_id);
- if (builder_it != _builder_fragment_ids.end()) {
- if (builder_it->second == fragment_id) {
- if (_hash_table_entries.find(builder_it->first) ==
_hash_table_entries.cend()) {
- // "here put an empty SharedHashTableEntry to avoid
deadlocking"
- _hash_table_entries.insert(
- {builder_it->first,
SharedHashTableEntry::empty_entry_with_status(status)});
- }
- }
- }
-}
-
-Status SharedHashTableController::release_ref_count_if_need(TUniqueId
fragment_id, Status status) {
- std::unique_lock<std::mutex> lock(_mutex);
- bool need_to_notify = false;
- for (auto& ref : _ref_fragments) {
- auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id);
- if (it == ref.second.end()) {
- continue;
- }
- ref.second.erase(it);
- need_to_notify = true;
- LOG(INFO) << "release_ref_count in node: " << ref.first
- << " for fragment id: " << fragment_id;
- }
-
- for (auto& builder : _builder_fragment_ids) {
- if (builder.second == fragment_id) {
- if (_hash_table_entries.find(builder.first) ==
_hash_table_entries.cend()) {
- _hash_table_entries.insert(
- {builder.first,
SharedHashTableEntry::empty_entry_with_status(status)});
- }
- }
- }
-
- if (need_to_notify) {
- _cv.notify_all();
+TUniqueId SharedHashTableController::get_builder_fragment_instance_id(int
my_node_id) {
+ std::lock_guard<std::mutex> lock(_mutex);
+ auto it = _builder_fragment_ids.find(my_node_id);
+ if (it == _builder_fragment_ids.cend()) {
+ return TUniqueId {};
}
- return Status::OK();
+ return it->second;
}
-Status SharedHashTableController::wait_for_closable(RuntimeState* state, int
my_node_id) {
+Status SharedHashTableController::wait_for_signal(RuntimeState* state,
+ const
SharedHashTableContextPtr& context) {
std::unique_lock<std::mutex> lock(_mutex);
- if (!_ref_fragments[my_node_id].empty()) {
- _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); });
+ // maybe builder signaled before other instances waiting,
+ // so here need to check value of `signaled`
+ while (!context->signaled) {
+ _cv.wait_for(lock, std::chrono::milliseconds(400), [&]() { return
context->signaled; });
+ // return if the instances is cancelled(eg. query timeout)
+ RETURN_IF_CANCELLED(state);
}
- RETURN_IF_CANCELLED(state);
- return Status::OK();
+ return context->status;
}
} // namespace vectorized
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/runtime/shared_hash_table_controller.h
b/be/src/vec/runtime/shared_hash_table_controller.h
index 9836c3ec3f..d7d50570db 100644
--- a/be/src/vec/runtime/shared_hash_table_controller.h
+++ b/be/src/vec/runtime/shared_hash_table_controller.h
@@ -29,70 +29,50 @@ namespace doris {
class RuntimeState;
class TUniqueId;
-
-template <typename ExprCtxType>
-class RuntimeFilterSlotsBase;
+class MinMaxFuncBase;
+class HybridSetBase;
+class BloomFilterFuncBase;
namespace vectorized {
class VExprContext;
-struct SharedHashTableEntry {
- SharedHashTableEntry(Status status_, void* hash_table_ptr_,
std::vector<Block>* blocks_,
- RuntimeFilterSlotsBase<VExprContext>*
runtime_filter_slots_)
- : status(status_),
- hash_table_ptr(hash_table_ptr_),
- blocks(blocks_),
- runtime_filter_slots(runtime_filter_slots_) {}
- SharedHashTableEntry(SharedHashTableEntry&& entry)
- : status(entry.status),
- hash_table_ptr(entry.hash_table_ptr),
- blocks(entry.blocks),
- runtime_filter_slots(entry.runtime_filter_slots) {}
+struct SharedRuntimeFilterContext {
+ std::shared_ptr<MinMaxFuncBase> minmax_func;
+ std::shared_ptr<HybridSetBase> hybrid_set;
+ std::shared_ptr<BloomFilterFuncBase> bloom_filter_func;
+};
- static SharedHashTableEntry empty_entry_with_status(const Status& status) {
- return SharedHashTableEntry(status, nullptr, nullptr, nullptr);
- }
+struct SharedHashTableContext {
+ SharedHashTableContext()
+ : hash_table_variants(nullptr),
+ signaled(false),
+ short_circuit_for_null_in_probe_side(false) {}
Status status;
- void* hash_table_ptr;
- std::vector<Block>* blocks;
- RuntimeFilterSlotsBase<VExprContext>* runtime_filter_slots;
+ std::shared_ptr<Arena> arena;
+ std::shared_ptr<void> hash_table_variants;
+ std::shared_ptr<std::vector<Block>> blocks;
+ std::map<int, SharedRuntimeFilterContext> runtime_filters;
+ bool signaled;
+ bool short_circuit_for_null_in_probe_side;
};
+using SharedHashTableContextPtr = std::shared_ptr<SharedHashTableContext>;
+
class SharedHashTableController {
public:
- bool should_build_hash_table(RuntimeState* state, int my_node_id);
- bool supposed_to_build_hash_table(RuntimeState* state, int my_node_id);
- void acquire_ref_count(RuntimeState* state, int my_node_id);
- SharedHashTableEntry& wait_for_hash_table(int my_node_id);
- Status release_ref_count(RuntimeState* state, int my_node_id);
- Status release_ref_count_if_need(TUniqueId fragment_id, Status status);
- void put_hash_table(SharedHashTableEntry&& entry, int my_node_id);
- Status wait_for_closable(RuntimeState* state, int my_node_id);
-
- // Single-thread operation
- void set_short_circuit_for_null_in_probe_side(bool
short_circuit_for_null_in_probe_side) {
- _short_circuit_for_null_in_probe_side =
short_circuit_for_null_in_probe_side;
- }
-
- bool short_circuit_for_null_in_probe_side() const {
- return _short_circuit_for_null_in_probe_side;
- }
-
-private:
- // If the fragment instance was supposed to build hash table, but it
didn't build.
- // To avoid deadlocking other fragment instances,
- // here need to put an empty SharedHashTableEntry with canceled status.
- void _put_an_empty_entry_if_need(Status status, TUniqueId fragment_id, int
node_id);
+ TUniqueId get_builder_fragment_instance_id(int my_node_id);
+ SharedHashTableContextPtr get_context(int my_node_id);
+ void signal(int my_node_id);
+ Status wait_for_signal(RuntimeState* state, const
SharedHashTableContextPtr& context);
+ bool should_build_hash_table(const TUniqueId& fragment_instance_id, int
my_node_id);
private:
std::mutex _mutex;
std::condition_variable _cv;
- std::map<int /*node id*/, TUniqueId /*fragment id*/> _builder_fragment_ids;
- std::map<int /*node id*/, SharedHashTableEntry> _hash_table_entries;
- std::map<int /*node id*/, std::vector<TUniqueId>> _ref_fragments;
- bool _short_circuit_for_null_in_probe_side;
+ std::map<int /*node id*/, TUniqueId /*fragment instance id*/>
_builder_fragment_ids;
+ std::map<int /*node id*/, SharedHashTableContextPtr> _shared_contexts;
};
} // namespace vectorized
diff --git a/be/src/vec/runtime/shared_hashtable_controller.cpp
b/be/src/vec/runtime/shared_hashtable_controller.cpp
deleted file mode 100644
index a761da8c2e..0000000000
--- a/be/src/vec/runtime/shared_hashtable_controller.cpp
+++ /dev/null
@@ -1,95 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "shared_hashtable_controller.h"
-
-#include <runtime/runtime_state.h>
-
-namespace doris {
-namespace vectorized {
-
-bool SharedHashTableController::should_build_hash_table(RuntimeState* state,
int my_node_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- auto it = _builder_fragment_ids.find(my_node_id);
- if (it == _builder_fragment_ids.cend()) {
- _builder_fragment_ids[my_node_id] = state->fragment_instance_id();
- return true;
- }
- return false;
-}
-
-void SharedHashTableController::put_hash_table(SharedHashTableEntry&& entry,
int my_node_id) {
- std::lock_guard<std::mutex> lock(_mutex);
- DCHECK(_hash_table_entries.find(my_node_id) == _hash_table_entries.cend());
- _hash_table_entries.insert({my_node_id, std::move(entry)});
- _cv.notify_all();
-}
-
-SharedHashTableEntry& SharedHashTableController::wait_for_hash_table(int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- auto it = _hash_table_entries.find(my_node_id);
- if (it == _hash_table_entries.cend()) {
- _cv.wait(lock, [this, &it, my_node_id]() {
- it = _hash_table_entries.find(my_node_id);
- return it != _hash_table_entries.cend();
- });
- }
- return it->second;
-}
-
-void SharedHashTableController::acquire_ref_count(RuntimeState* state, int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- _ref_fragments[my_node_id].emplace_back(state->fragment_instance_id());
-}
-
-Status SharedHashTableController::release_ref_count(RuntimeState* state, int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- RETURN_IF_CANCELLED(state);
- auto id = state->fragment_instance_id();
- auto it = std::find(_ref_fragments[my_node_id].begin(),
_ref_fragments[my_node_id].end(), id);
- CHECK(it != _ref_fragments[my_node_id].end());
- _ref_fragments[my_node_id].erase(it);
- _cv.notify_all();
- return Status::OK();
-}
-
-Status SharedHashTableController::release_ref_count_if_need(TUniqueId
fragment_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- bool need_to_notify = false;
- for (auto& ref : _ref_fragments) {
- auto it = std::find(ref.second.begin(), ref.second.end(), fragment_id);
- if (it == ref.second.end()) continue;
- ref.second.erase(it);
- need_to_notify = true;
- LOG(INFO) << "release_ref_count in node: " << ref.first
- << " for fragment id: " << fragment_id;
- }
- if (need_to_notify) _cv.notify_all();
- return Status::OK();
-}
-
-Status SharedHashTableController::wait_for_closable(RuntimeState* state, int
my_node_id) {
- std::unique_lock<std::mutex> lock(_mutex);
- RETURN_IF_CANCELLED(state);
- if (!_ref_fragments[my_node_id].empty()) {
- _cv.wait(lock, [&]() { return _ref_fragments[my_node_id].empty(); });
- }
- return Status::OK();
-}
-
-} // namespace vectorized
-} // namespace doris
\ No newline at end of file
diff --git a/be/src/vec/runtime/shared_hashtable_controller.h
b/be/src/vec/runtime/shared_hashtable_controller.h
deleted file mode 100644
index 842dc89e90..0000000000
--- a/be/src/vec/runtime/shared_hashtable_controller.h
+++ /dev/null
@@ -1,75 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <condition_variable>
-#include <map>
-#include <memory>
-#include <mutex>
-#include <vector>
-
-#include "vec/core/block.h"
-
-namespace doris {
-
-class RuntimeState;
-class TUniqueId;
-
-namespace vectorized {
-
-class VExprContext;
-
-struct SharedHashTableEntry {
- SharedHashTableEntry(void* hash_table_ptr_, std::vector<Block>& blocks_,
- std::unordered_map<const Block*, std::vector<int>>&
inserted_rows_,
- const std::vector<VExprContext*>& exprs)
- : hash_table_ptr(hash_table_ptr_),
- blocks(blocks_),
- inserted_rows(inserted_rows_),
- build_exprs(exprs) {}
- SharedHashTableEntry(SharedHashTableEntry&& entry)
- : hash_table_ptr(entry.hash_table_ptr),
- blocks(entry.blocks),
- inserted_rows(entry.inserted_rows),
- build_exprs(entry.build_exprs) {}
- void* hash_table_ptr;
- std::vector<Block>& blocks;
- std::unordered_map<const Block*, std::vector<int>>& inserted_rows;
- std::vector<VExprContext*> build_exprs;
-};
-
-class SharedHashTableController {
-public:
- bool should_build_hash_table(RuntimeState* state, int my_node_id);
- void acquire_ref_count(RuntimeState* state, int my_node_id);
- SharedHashTableEntry& wait_for_hash_table(int my_node_id);
- Status release_ref_count(RuntimeState* state, int my_node_id);
- Status release_ref_count_if_need(TUniqueId fragment_id);
- void put_hash_table(SharedHashTableEntry&& entry, int my_node_id);
- Status wait_for_closable(RuntimeState* state, int my_node_id);
-
-private:
- std::mutex _mutex;
- std::condition_variable _cv;
- std::map<int /*node id*/, TUniqueId /*fragment id*/> _builder_fragment_ids;
- std::map<int /*node id*/, SharedHashTableEntry> _hash_table_entries;
- std::map<int /*node id*/, std::vector<TUniqueId>> _ref_fragments;
-};
-
-} // namespace vectorized
-} // namespace doris
\ No newline at end of file
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index c8622de800..3bcf3ed0fc 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -231,6 +231,9 @@ public class SessionVariable implements Serializable,
Writable {
public static final String PARTITIONED_HASH_JOIN_ROWS_THRESHOLD =
"partitioned_hash_join_rows_threshold";
+ public static final String ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN
+ = "enable_share_hash_table_for_broadcast_join";
+
// session origin value
public Map<Field, String> sessionOriginValue = new HashMap<Field,
String>();
// check stmt is or not [select /*+ SET_VAR(...)*/ ...]
@@ -607,6 +610,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = PARTITIONED_HASH_JOIN_ROWS_THRESHOLD)
public int partitionedHashJoinRowsThreshold = 0;
+ @VariableMgr.VarAttr(name = ENABLE_SHARE_HASH_TABLE_FOR_BROADCAST_JOIN)
+ public boolean enableShareHashTableForBroadcastJoin = true;
+
// If this fe is in fuzzy mode, then will use initFuzzyModeVariables to
generate some variables,
// not the default value set in the code.
public void initFuzzyModeVariables() {
@@ -1227,6 +1233,7 @@ public class SessionVariable implements Serializable,
Writable {
tResult.setBeExecVersion(Config.be_exec_version);
tResult.setReturnObjectDataAsBinary(returnObjectDataAsBinary);
tResult.setTrimTailingSpacesForExternalTableQuery(trimTailingSpacesForExternalTableQuery);
+
tResult.setEnableShareHashTableForBroadcastJoin(enableShareHashTableForBroadcastJoin);
tResult.setBatchSize(batchSize);
tResult.setDisableStreamPreaggregations(disableStreamPreaggregations);
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index b725ea9191..47a9144269 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -181,6 +181,8 @@ struct TQueryOptions {
52: optional i32 be_exec_version = 0
53: optional i32 partitioned_hash_join_rows_threshold = 0
+
+ 54: optional bool enable_share_hash_table_for_broadcast_join
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]