This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new d542818fbcb [UT](runtime filter) Add runtime filter test case (#47035)
d542818fbcb is described below
commit d542818fbcbdffb94e44d2f1dc20f0b34f1be95a
Author: Gabriel <[email protected]>
AuthorDate: Thu Jan 16 10:12:04 2025 +0800
[UT](runtime filter) Add runtime filter test case (#47035)
---
be/src/exprs/runtime_filter.cpp | 1343 ++++++++++++-------------
be/src/exprs/runtime_filter.h | 111 ++
be/src/exprs/runtime_filter_slots.h | 54 +-
be/src/pipeline/exec/operator.cpp | 2 +-
be/src/pipeline/pipeline_fragment_context.cpp | 3 +-
be/src/pipeline/pipeline_fragment_context.h | 3 +-
be/src/pipeline/task_queue.h | 9 +-
be/test/pipeline/dummy_task_queue.h | 50 +
be/test/pipeline/pipeline_test.cpp | 276 ++++-
be/test/pipeline/thrift_builder.h | 106 +-
gensrc/thrift/PaloInternalService.thrift | 5 +-
gensrc/thrift/PlanNodes.thrift | 2 +-
12 files changed, 1174 insertions(+), 790 deletions(-)
diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp
index b85f370165a..33cc155fc6f 100644
--- a/be/src/exprs/runtime_filter.cpp
+++ b/be/src/exprs/runtime_filter.cpp
@@ -283,711 +283,6 @@ Status create_vbin_predicate(const TypeDescriptor& type,
TExprOpcode::type opcod
*tnode = node;
return vectorized::VExpr::create_expr(node, expr);
}
-// This class is a wrapper of runtime predicate function
-class RuntimePredicateWrapper {
-public:
- RuntimePredicateWrapper(const RuntimeFilterParams* params)
- : RuntimePredicateWrapper(params->column_return_type,
params->filter_type,
- params->filter_id) {};
- // for a 'tmp' runtime predicate wrapper
- // only could called assign method or as a param for merge
- RuntimePredicateWrapper(PrimitiveType column_type, RuntimeFilterType type,
uint32_t filter_id)
- : _column_return_type(column_type),
- _filter_type(type),
- _context(new RuntimeFilterContext()),
- _filter_id(filter_id) {}
-
- // init runtime filter wrapper
- // alloc memory to init runtime filter function
- Status init(const RuntimeFilterParams* params) {
- _max_in_num = params->max_in_num;
- switch (_filter_type) {
- case RuntimeFilterType::IN_FILTER: {
- _context->hybrid_set.reset(create_set(_column_return_type));
- _context->hybrid_set->set_null_aware(params->null_aware);
- break;
- }
- // Only use in nested loop join not need set null aware
- case RuntimeFilterType::MIN_FILTER:
- case RuntimeFilterType::MAX_FILTER: {
-
_context->minmax_func.reset(create_minmax_filter(_column_return_type));
- break;
- }
- case RuntimeFilterType::MINMAX_FILTER: {
-
_context->minmax_func.reset(create_minmax_filter(_column_return_type));
- _context->minmax_func->set_null_aware(params->null_aware);
- break;
- }
- case RuntimeFilterType::BLOOM_FILTER: {
-
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
- _context->bloom_filter_func->init_params(params);
- return Status::OK();
- }
- case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
- _context->hybrid_set.reset(create_set(_column_return_type));
- _context->hybrid_set->set_null_aware(params->null_aware);
-
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
- _context->bloom_filter_func->init_params(params);
- return Status::OK();
- }
- case RuntimeFilterType::BITMAP_FILTER: {
-
_context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
-
_context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
- return Status::OK();
- }
- default:
- return Status::InternalError("Unknown Filter type");
- }
- return Status::OK();
- }
-
- Status change_to_bloom_filter() {
- if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- return Status::InternalError(
- "Can not change to bloom filter because of runtime filter
type is {}",
- IRuntimeFilter::to_string(_filter_type));
- }
- BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
-
- if (bf != nullptr) {
- insert_to_bloom_filter(bf);
- } else if (_context->hybrid_set != nullptr &&
_context->hybrid_set->size() != 0) {
- return Status::InternalError("change to bloom filter need empty
set ",
-
IRuntimeFilter::to_string(_filter_type));
- }
-
- // release in filter
- _context->hybrid_set.reset();
- return Status::OK();
- }
-
- Status init_bloom_filter(const size_t build_bf_cardinality) {
- if (_filter_type != RuntimeFilterType::BLOOM_FILTER &&
- _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- throw Exception(ErrorCode::INTERNAL_ERROR,
- "init_bloom_filter meet invalid input type {}",
int(_filter_type));
- }
- return
_context->bloom_filter_func->init_with_cardinality(build_bf_cardinality);
- }
-
- bool get_build_bf_cardinality() const {
- if (_filter_type == RuntimeFilterType::BLOOM_FILTER ||
- _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- return _context->bloom_filter_func->get_build_bf_cardinality();
- }
- return false;
- }
-
- void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const {
- if (_context->hybrid_set->size() > 0) {
- auto* it = _context->hybrid_set->begin();
- while (it->has_next()) {
- bloom_filter->insert(it->get_value());
- it->next();
- }
- }
- if (_context->hybrid_set->contain_null()) {
- bloom_filter->set_contain_null_and_null_aware();
- }
- }
-
- BloomFilterFuncBase* get_bloomfilter() const { return
_context->bloom_filter_func.get(); }
-
- void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start) {
- if (is_ignored()) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet
ignored rf");
- }
- switch (_filter_type) {
- case RuntimeFilterType::IN_FILTER: {
- _context->hybrid_set->insert_fixed_len(column, start);
- break;
- }
- case RuntimeFilterType::MIN_FILTER:
- case RuntimeFilterType::MAX_FILTER:
- case RuntimeFilterType::MINMAX_FILTER: {
- _context->minmax_func->insert_fixed_len(column, start);
- break;
- }
- case RuntimeFilterType::BLOOM_FILTER: {
- _context->bloom_filter_func->insert_fixed_len(column, start);
- break;
- }
- case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
- if (is_bloomfilter()) {
- _context->bloom_filter_func->insert_fixed_len(column, start);
- } else {
- _context->hybrid_set->insert_fixed_len(column, start);
- }
- break;
- }
- default:
- DCHECK(false);
- break;
- }
- }
-
- void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
- if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
- bitmap_filter_insert_batch(column, start);
- } else {
- insert_fixed_len(column, start);
- }
- }
-
- void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t
start) {
- std::vector<const BitmapValue*> bitmaps;
- if (column->is_nullable()) {
- const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
- const auto& col =
- assert_cast<const
vectorized::ColumnBitmap&>(nullable->get_nested_column());
- const auto& nullmap =
- assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
- .get_data();
- for (size_t i = start; i < column->size(); i++) {
- if (!nullmap[i]) {
- bitmaps.push_back(&(col.get_data()[i]));
- }
- }
- } else {
- const auto* col = assert_cast<const
vectorized::ColumnBitmap*>(column.get());
- for (size_t i = start; i < column->size(); i++) {
- bitmaps.push_back(&(col->get_data()[i]));
- }
- }
- _context->bitmap_filter_func->insert_many(bitmaps);
- }
-
- RuntimeFilterType get_real_type() const {
- if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- if (_context->hybrid_set) {
- return RuntimeFilterType::IN_FILTER;
- }
- return RuntimeFilterType::BLOOM_FILTER;
- }
- return _filter_type;
- }
-
- size_t get_bloom_filter_size() const {
- return _context->bloom_filter_func ?
_context->bloom_filter_func->get_size() : 0;
- }
-
- Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
- std::vector<vectorized::VRuntimeFilterPtr>&
push_exprs,
- const TExpr& probe_expr);
-
- Status merge(const RuntimePredicateWrapper* wrapper) {
- if (wrapper->is_disabled()) {
- set_disabled();
- return Status::OK();
- }
-
- if (wrapper->is_ignored() || is_disabled()) {
- return Status::OK();
- }
-
- _context->ignored = false;
-
- bool can_not_merge_in_or_bloom =
- _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
- (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
- wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
- wrapper->_filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER);
-
- bool can_not_merge_other = _filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
- _filter_type != wrapper->_filter_type;
-
- CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
- << " can not merge runtime filter(id=" << _filter_id
- << "), current is filter type is " <<
IRuntimeFilter::to_string(_filter_type)
- << ", other filter type is " <<
IRuntimeFilter::to_string(wrapper->_filter_type);
-
- switch (_filter_type) {
- case RuntimeFilterType::IN_FILTER: {
- _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
- if (_max_in_num >= 0 && _context->hybrid_set->size() >=
_max_in_num) {
- set_disabled();
- }
- break;
- }
- case RuntimeFilterType::MIN_FILTER:
- case RuntimeFilterType::MAX_FILTER:
- case RuntimeFilterType::MINMAX_FILTER: {
-
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
- break;
- }
- case RuntimeFilterType::BLOOM_FILTER: {
- RETURN_IF_ERROR(
-
_context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get()));
- break;
- }
- case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
- auto real_filter_type = get_real_type();
-
- auto other_filter_type = wrapper->_filter_type;
- if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
- other_filter_type = wrapper->get_real_type();
- }
-
- if (real_filter_type == RuntimeFilterType::IN_FILTER) {
- // when we meet base rf is in-filter, threre only have two
case:
- // case1: all input-filter's build_bf_exactly is true, inited
by synced global size
- // case2: all input-filter's build_bf_exactly is false, inited
by default size
- if (other_filter_type == RuntimeFilterType::IN_FILTER) {
-
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
- if (_max_in_num >= 0 && _context->hybrid_set->size() >=
_max_in_num) {
- // case2: use default size to init bf
-
RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length());
- RETURN_IF_ERROR(change_to_bloom_filter());
- }
- } else {
- // case1&case2: use input bf directly and insert hybrid
set data into bf
- _context->bloom_filter_func =
wrapper->_context->bloom_filter_func;
- RETURN_IF_ERROR(change_to_bloom_filter());
- }
- } else {
- if (other_filter_type == RuntimeFilterType::IN_FILTER) {
- // case2: insert data to global filter
-
wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get());
- } else {
- // case1&case2: all input bf must has same size
- RETURN_IF_ERROR(_context->bloom_filter_func->merge(
- wrapper->_context->bloom_filter_func.get()));
- }
- }
- break;
- }
- case RuntimeFilterType::BITMAP_FILTER: {
- // use input bitmap directly because we assume bitmap filter join
always have full data
- _context->bitmap_filter_func =
wrapper->_context->bitmap_filter_func;
- break;
- }
- default:
- return Status::InternalError("unknown runtime filter");
- }
- return Status::OK();
- }
-
- Status assign(const PInFilter* in_filter, bool contain_null) {
- _context->hybrid_set.reset(create_set(_column_return_type));
- if (contain_null) {
- _context->hybrid_set->set_null_aware(true);
- _context->hybrid_set->insert((const void*)nullptr);
- }
-
- switch (_column_return_type) {
- case TYPE_BOOLEAN: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- bool bool_val = column.boolval();
- set->insert(&bool_val);
- });
- break;
- }
- case TYPE_TINYINT: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto int_val = static_cast<int8_t>(column.intval());
- set->insert(&int_val);
- });
- break;
- }
- case TYPE_SMALLINT: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto int_val = static_cast<int16_t>(column.intval());
- set->insert(&int_val);
- });
- break;
- }
- case TYPE_INT: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- int32_t int_val = column.intval();
- set->insert(&int_val);
- });
- break;
- }
- case TYPE_BIGINT: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- int64_t long_val = column.longval();
- set->insert(&long_val);
- });
- break;
- }
- case TYPE_LARGEINT: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto string_val = column.stringval();
- StringParser::ParseResult result;
- auto int128_val = StringParser::string_to_int<int128_t>(
- string_val.c_str(), string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- set->insert(&int128_val);
- });
- break;
- }
- case TYPE_FLOAT: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto float_val = static_cast<float>(column.doubleval());
- set->insert(&float_val);
- });
- break;
- }
- case TYPE_DOUBLE: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- double double_val = column.doubleval();
- set->insert(&double_val);
- });
- break;
- }
- case TYPE_DATEV2: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto date_v2_val = column.intval();
- set->insert(&date_v2_val);
- });
- break;
- }
- case TYPE_DATETIMEV2: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto date_v2_val = column.longval();
- set->insert(&date_v2_val);
- });
- break;
- }
- case TYPE_DATETIME:
- case TYPE_DATE: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- const auto& string_val_ref = column.stringval();
- VecDateTimeValue datetime_val;
- datetime_val.from_date_str(string_val_ref.c_str(),
string_val_ref.length());
- set->insert(&datetime_val);
- });
- break;
- }
- case TYPE_DECIMALV2: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- const auto& string_val_ref = column.stringval();
- DecimalV2Value decimal_val(string_val_ref);
- set->insert(&decimal_val);
- });
- break;
- }
- case TYPE_DECIMAL32: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- int32_t decimal_32_val = column.intval();
- set->insert(&decimal_32_val);
- });
- break;
- }
- case TYPE_DECIMAL64: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- int64_t decimal_64_val = column.longval();
- set->insert(&decimal_64_val);
- });
- break;
- }
- case TYPE_DECIMAL128I: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto string_val = column.stringval();
- StringParser::ParseResult result;
- auto int128_val = StringParser::string_to_int<int128_t>(
- string_val.c_str(), string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- set->insert(&int128_val);
- });
- break;
- }
- case TYPE_DECIMAL256: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto string_val = column.stringval();
- StringParser::ParseResult result;
- auto int_val = StringParser::string_to_int<wide::Int256>(
- string_val.c_str(), string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- set->insert(&int_val);
- });
- break;
- }
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_STRING: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- const std::string& string_value = column.stringval();
- // string_value is std::string, call insert(data, size)
function in StringSet will not cast as StringRef
- // so could avoid some cast error at different class object.
- set->insert((void*)string_value.data(), string_value.size());
- });
- break;
- }
- case TYPE_IPV4: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- int32_t tmp = column.intval();
- set->insert(&tmp);
- });
- break;
- }
- case TYPE_IPV6: {
- batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
- auto string_val = column.stringval();
- StringParser::ParseResult result;
- auto int128_val = StringParser::string_to_int<uint128_t>(
- string_val.c_str(), string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- set->insert(&int128_val);
- });
- break;
- }
- default: {
- return Status::InternalError("not support assign to in filter,
type: " +
- type_to_string(_column_return_type));
- }
- }
- return Status::OK();
- }
-
- void set_enable_fixed_len_to_uint32_v2() {
- if (_context->bloom_filter_func) {
- _context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2();
- }
- }
-
- // used by shuffle runtime filter
- // assign this filter by protobuf
- Status assign(const PBloomFilter* bloom_filter,
butil::IOBufAsZeroCopyInputStream* data,
- bool contain_null) {
- // we won't use this class to insert or find any data
- // so any type is ok
-
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type ==
INVALID_TYPE
- ?
PrimitiveType::TYPE_INT
- :
_column_return_type));
- RETURN_IF_ERROR(_context->bloom_filter_func->assign(data,
bloom_filter->filter_length(),
- contain_null));
- return Status::OK();
- }
-
- // used by shuffle runtime filter
- // assign this filter by protobuf
- Status assign(const PMinMaxFilter* minmax_filter, bool contain_null) {
- _context->minmax_func.reset(create_minmax_filter(_column_return_type));
-
- if (contain_null) {
- _context->minmax_func->set_null_aware(true);
- _context->minmax_func->set_contain_null();
- }
-
- switch (_column_return_type) {
- case TYPE_BOOLEAN: {
- bool min_val = minmax_filter->min_val().boolval();
- bool max_val = minmax_filter->max_val().boolval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_TINYINT: {
- auto min_val =
static_cast<int8_t>(minmax_filter->min_val().intval());
- auto max_val =
static_cast<int8_t>(minmax_filter->max_val().intval());
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_SMALLINT: {
- auto min_val =
static_cast<int16_t>(minmax_filter->min_val().intval());
- auto max_val =
static_cast<int16_t>(minmax_filter->max_val().intval());
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_INT: {
- int32_t min_val = minmax_filter->min_val().intval();
- int32_t max_val = minmax_filter->max_val().intval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_BIGINT: {
- int64_t min_val = minmax_filter->min_val().longval();
- int64_t max_val = minmax_filter->max_val().longval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_LARGEINT: {
- auto min_string_val = minmax_filter->min_val().stringval();
- auto max_string_val = minmax_filter->max_val().stringval();
- StringParser::ParseResult result;
- auto min_val =
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
-
min_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- auto max_val =
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
-
max_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_FLOAT: {
- auto min_val =
static_cast<float>(minmax_filter->min_val().doubleval());
- auto max_val =
static_cast<float>(minmax_filter->max_val().doubleval());
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DOUBLE: {
- auto min_val =
static_cast<double>(minmax_filter->min_val().doubleval());
- auto max_val =
static_cast<double>(minmax_filter->max_val().doubleval());
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DATEV2: {
- int32_t min_val = minmax_filter->min_val().intval();
- int32_t max_val = minmax_filter->max_val().intval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DATETIMEV2: {
- int64_t min_val = minmax_filter->min_val().longval();
- int64_t max_val = minmax_filter->max_val().longval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DATETIME:
- case TYPE_DATE: {
- const auto& min_val_ref = minmax_filter->min_val().stringval();
- const auto& max_val_ref = minmax_filter->max_val().stringval();
- VecDateTimeValue min_val;
- VecDateTimeValue max_val;
- min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
- max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DECIMALV2: {
- const auto& min_val_ref = minmax_filter->min_val().stringval();
- const auto& max_val_ref = minmax_filter->max_val().stringval();
- DecimalV2Value min_val(min_val_ref);
- DecimalV2Value max_val(max_val_ref);
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DECIMAL32: {
- int32_t min_val = minmax_filter->min_val().intval();
- int32_t max_val = minmax_filter->max_val().intval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DECIMAL64: {
- int64_t min_val = minmax_filter->min_val().longval();
- int64_t max_val = minmax_filter->max_val().longval();
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DECIMAL128I: {
- auto min_string_val = minmax_filter->min_val().stringval();
- auto max_string_val = minmax_filter->max_val().stringval();
- StringParser::ParseResult result;
- auto min_val =
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
-
min_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- auto max_val =
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
-
max_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_DECIMAL256: {
- auto min_string_val = minmax_filter->min_val().stringval();
- auto max_string_val = minmax_filter->max_val().stringval();
- StringParser::ParseResult result;
- auto min_val = StringParser::string_to_int<wide::Int256>(
- min_string_val.c_str(), min_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- auto max_val = StringParser::string_to_int<wide::Int256>(
- max_string_val.c_str(), max_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- case TYPE_VARCHAR:
- case TYPE_CHAR:
- case TYPE_STRING: {
- auto min_val_ref = minmax_filter->min_val().stringval();
- auto max_val_ref = minmax_filter->max_val().stringval();
- return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
- }
- case TYPE_IPV4: {
- int tmp_min = minmax_filter->min_val().intval();
- int tmp_max = minmax_filter->max_val().intval();
- return _context->minmax_func->assign(&tmp_min, &tmp_max);
- }
- case TYPE_IPV6: {
- auto min_string_val = minmax_filter->min_val().stringval();
- auto max_string_val = minmax_filter->max_val().stringval();
- StringParser::ParseResult result;
- auto min_val =
StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
-
min_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- auto max_val =
StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
-
max_string_val.length(), &result);
- DCHECK(result == StringParser::PARSE_SUCCESS);
- return _context->minmax_func->assign(&min_val, &max_val);
- }
- default:
- break;
- }
- return Status::InternalError("not support!");
- }
-
- void get_bloom_filter_desc(char** data, int* filter_length) {
- _context->bloom_filter_func->get_data(data, filter_length);
- }
-
- PrimitiveType column_type() { return _column_return_type; }
-
- bool is_bloomfilter() const { return get_real_type() ==
RuntimeFilterType::BLOOM_FILTER; }
-
- bool contain_null() const {
- if (is_bloomfilter()) {
- return _context->bloom_filter_func->contain_null();
- }
- if (_context->hybrid_set) {
- if (get_real_type() != RuntimeFilterType::IN_FILTER) {
- throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set
but real type is {}",
- int(get_real_type()));
- }
- return _context->hybrid_set->contain_null();
- }
- if (_context->minmax_func) {
- return _context->minmax_func->contain_null();
- }
- return false;
- }
-
- bool is_ignored() const { return _context->ignored; }
-
- void set_ignored() { _context->ignored = true; }
-
- bool is_disabled() const { return _context->disabled; }
-
- void set_disabled() {
- _context->disabled = true;
- _context->minmax_func.reset();
- _context->hybrid_set.reset();
- _context->bloom_filter_func.reset();
- _context->bitmap_filter_func.reset();
- }
-
- void batch_assign(const PInFilter* filter,
- void (*assign_func)(std::shared_ptr<HybridSetBase>&
_hybrid_set,
- PColumnValue&)) {
- for (int i = 0; i < filter->values_size(); ++i) {
- PColumnValue column = filter->values(i);
- assign_func(_context->hybrid_set, column);
- }
- }
-
- size_t get_in_filter_size() const {
- return _context->hybrid_set ? _context->hybrid_set->size() : 0;
- }
-
- std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const {
- return _context->bitmap_filter_func;
- }
-
- friend class IRuntimeFilter;
-
- void set_filter_id(int id) {
- if (_context->bloom_filter_func) {
- _context->bloom_filter_func->set_filter_id(id);
- }
- if (_context->bitmap_filter_func) {
- _context->bitmap_filter_func->set_filter_id(id);
- }
- if (_context->hybrid_set) {
- _context->hybrid_set->set_filter_id(id);
- }
- }
-
-private:
- // When a runtime filter received from remote and it is a bloom filter,
_column_return_type will be invalid.
- PrimitiveType _column_return_type; // column type
- RuntimeFilterType _filter_type;
- int32_t _max_in_num = -1;
-
- RuntimeFilterContextSPtr _context;
- uint32_t _filter_id;
-};
Status IRuntimeFilter::create(RuntimeFilterParamsContext* state, const
TRuntimeFilterDesc* desc,
const TQueryOptions* query_options, const
RuntimeFilterRole role,
@@ -1794,4 +1089,642 @@ Status RuntimePredicateWrapper::get_push_exprs(
RuntimeFilterWrapperHolder::RuntimeFilterWrapperHolder() = default;
RuntimeFilterWrapperHolder::~RuntimeFilterWrapperHolder() = default;
+Status RuntimePredicateWrapper::init(const RuntimeFilterParams* params) {
+ _max_in_num = params->max_in_num;
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ _context->hybrid_set.reset(create_set(_column_return_type));
+ _context->hybrid_set->set_null_aware(params->null_aware);
+ break;
+ }
+ // Only use in nested loop join not need set null aware
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER: {
+ _context->minmax_func.reset(create_minmax_filter(_column_return_type));
+ break;
+ }
+ case RuntimeFilterType::MINMAX_FILTER: {
+ _context->minmax_func.reset(create_minmax_filter(_column_return_type));
+ _context->minmax_func->set_null_aware(params->null_aware);
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+ _context->bloom_filter_func->init_params(params);
+ return Status::OK();
+ }
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ _context->hybrid_set.reset(create_set(_column_return_type));
+ _context->hybrid_set->set_null_aware(params->null_aware);
+
_context->bloom_filter_func.reset(create_bloom_filter(_column_return_type));
+ _context->bloom_filter_func->init_params(params);
+ return Status::OK();
+ }
+ case RuntimeFilterType::BITMAP_FILTER: {
+
_context->bitmap_filter_func.reset(create_bitmap_filter(_column_return_type));
+ _context->bitmap_filter_func->set_not_in(params->bitmap_filter_not_in);
+ return Status::OK();
+ }
+ default:
+ return Status::InternalError("Unknown Filter type");
+ }
+ return Status::OK();
+}
+
+Status RuntimePredicateWrapper::change_to_bloom_filter() {
+ if (_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ return Status::InternalError(
+ "Can not change to bloom filter because of runtime filter type
is {}",
+ IRuntimeFilter::to_string(_filter_type));
+ }
+ BloomFilterFuncBase* bf = _context->bloom_filter_func.get();
+
+ if (bf != nullptr) {
+ insert_to_bloom_filter(bf);
+ } else if (_context->hybrid_set != nullptr && _context->hybrid_set->size()
!= 0) {
+ return Status::InternalError("change to bloom filter need empty set ",
+ IRuntimeFilter::to_string(_filter_type));
+ }
+
+ // release in filter
+ _context->hybrid_set.reset();
+ return Status::OK();
+}
+
+void RuntimePredicateWrapper::set_filter_id(int id) {
+ if (_context->bloom_filter_func) {
+ _context->bloom_filter_func->set_filter_id(id);
+ }
+ if (_context->bitmap_filter_func) {
+ _context->bitmap_filter_func->set_filter_id(id);
+ }
+ if (_context->hybrid_set) {
+ _context->hybrid_set->set_filter_id(id);
+ }
+}
+
+void RuntimePredicateWrapper::batch_assign(
+ const PInFilter* filter,
+ void (*assign_func)(std::shared_ptr<HybridSetBase>& _hybrid_set,
PColumnValue&)) {
+ for (int i = 0; i < filter->values_size(); ++i) {
+ PColumnValue column = filter->values(i);
+ assign_func(_context->hybrid_set, column);
+ }
+}
+
+Status RuntimePredicateWrapper::init_bloom_filter(const size_t
build_bf_cardinality) {
+ if (_filter_type != RuntimeFilterType::BLOOM_FILTER &&
+ _filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ throw Exception(ErrorCode::INTERNAL_ERROR, "init_bloom_filter meet
invalid input type {}",
+ int(_filter_type));
+ }
+ return
_context->bloom_filter_func->init_with_cardinality(build_bf_cardinality);
+}
+
+bool RuntimePredicateWrapper::get_build_bf_cardinality() const {
+ if (_filter_type == RuntimeFilterType::BLOOM_FILTER ||
+ _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ return _context->bloom_filter_func->get_build_bf_cardinality();
+ }
+ return false;
+}
+
+void RuntimePredicateWrapper::insert_to_bloom_filter(BloomFilterFuncBase*
bloom_filter) const {
+ if (_context->hybrid_set->size() > 0) {
+ auto* it = _context->hybrid_set->begin();
+ while (it->has_next()) {
+ bloom_filter->insert(it->get_value());
+ it->next();
+ }
+ }
+ if (_context->hybrid_set->contain_null()) {
+ bloom_filter->set_contain_null_and_null_aware();
+ }
+}
+
+void RuntimePredicateWrapper::insert_fixed_len(const vectorized::ColumnPtr&
column, size_t start) {
+ if (is_ignored()) {
+ throw Exception(ErrorCode::INTERNAL_ERROR, "insert_fixed_len meet
ignored rf");
+ }
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ _context->hybrid_set->insert_fixed_len(column, start);
+ break;
+ }
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER:
+ case RuntimeFilterType::MINMAX_FILTER: {
+ _context->minmax_func->insert_fixed_len(column, start);
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ _context->bloom_filter_func->insert_fixed_len(column, start);
+ break;
+ }
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ if (is_bloomfilter()) {
+ _context->bloom_filter_func->insert_fixed_len(column, start);
+ } else {
+ _context->hybrid_set->insert_fixed_len(column, start);
+ }
+ break;
+ }
+ default:
+ DCHECK(false);
+ break;
+ }
+}
+
+void RuntimePredicateWrapper::bitmap_filter_insert_batch(const
vectorized::ColumnPtr column,
+ size_t start) {
+ std::vector<const BitmapValue*> bitmaps;
+ if (column->is_nullable()) {
+ const auto* nullable = assert_cast<const
vectorized::ColumnNullable*>(column.get());
+ const auto& col =
+ assert_cast<const
vectorized::ColumnBitmap&>(nullable->get_nested_column());
+ const auto& nullmap =
+ assert_cast<const
vectorized::ColumnUInt8&>(nullable->get_null_map_column())
+ .get_data();
+ for (size_t i = start; i < column->size(); i++) {
+ if (!nullmap[i]) {
+ bitmaps.push_back(&(col.get_data()[i]));
+ }
+ }
+ } else {
+ const auto* col = assert_cast<const
vectorized::ColumnBitmap*>(column.get());
+ for (size_t i = start; i < column->size(); i++) {
+ bitmaps.push_back(&(col->get_data()[i]));
+ }
+ }
+ _context->bitmap_filter_func->insert_many(bitmaps);
+}
+
+size_t RuntimePredicateWrapper::get_bloom_filter_size() const {
+ return _context->bloom_filter_func ?
_context->bloom_filter_func->get_size() : 0;
+}
+
+Status RuntimePredicateWrapper::merge(const RuntimePredicateWrapper* wrapper) {
+ if (wrapper->is_disabled()) {
+ set_disabled();
+ return Status::OK();
+ }
+
+ if (wrapper->is_ignored() || is_disabled()) {
+ return Status::OK();
+ }
+
+ _context->ignored = false;
+
+ bool can_not_merge_in_or_bloom =
+ _filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+ (wrapper->_filter_type != RuntimeFilterType::IN_FILTER &&
+ wrapper->_filter_type != RuntimeFilterType::BLOOM_FILTER &&
+ wrapper->_filter_type != RuntimeFilterType::IN_OR_BLOOM_FILTER);
+
+ bool can_not_merge_other = _filter_type !=
RuntimeFilterType::IN_OR_BLOOM_FILTER &&
+ _filter_type != wrapper->_filter_type;
+
+ CHECK(!can_not_merge_in_or_bloom && !can_not_merge_other)
+ << " can not merge runtime filter(id=" << _filter_id << "),
current is filter type is "
+ << IRuntimeFilter::to_string(_filter_type) << ", other filter type
is "
+ << IRuntimeFilter::to_string(wrapper->_filter_type);
+
+ switch (_filter_type) {
+ case RuntimeFilterType::IN_FILTER: {
+ _context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
+ if (_max_in_num >= 0 && _context->hybrid_set->size() >= _max_in_num) {
+ set_disabled();
+ }
+ break;
+ }
+ case RuntimeFilterType::MIN_FILTER:
+ case RuntimeFilterType::MAX_FILTER:
+ case RuntimeFilterType::MINMAX_FILTER: {
+
RETURN_IF_ERROR(_context->minmax_func->merge(wrapper->_context->minmax_func.get()));
+ break;
+ }
+ case RuntimeFilterType::BLOOM_FILTER: {
+ RETURN_IF_ERROR(
+
_context->bloom_filter_func->merge(wrapper->_context->bloom_filter_func.get()));
+ break;
+ }
+ case RuntimeFilterType::IN_OR_BLOOM_FILTER: {
+ auto real_filter_type = get_real_type();
+
+ auto other_filter_type = wrapper->_filter_type;
+ if (other_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ other_filter_type = wrapper->get_real_type();
+ }
+
+ if (real_filter_type == RuntimeFilterType::IN_FILTER) {
+ // when we meet base rf is in-filter, threre only have two case:
+ // case1: all input-filter's build_bf_exactly is true, inited by
synced global size
+ // case2: all input-filter's build_bf_exactly is false, inited by
default size
+ if (other_filter_type == RuntimeFilterType::IN_FILTER) {
+
_context->hybrid_set->insert(wrapper->_context->hybrid_set.get());
+ if (_max_in_num >= 0 && _context->hybrid_set->size() >=
_max_in_num) {
+ // case2: use default size to init bf
+
RETURN_IF_ERROR(_context->bloom_filter_func->init_with_fixed_length());
+ RETURN_IF_ERROR(change_to_bloom_filter());
+ }
+ } else {
+ // case1&case2: use input bf directly and insert hybrid set
data into bf
+ _context->bloom_filter_func =
wrapper->_context->bloom_filter_func;
+ RETURN_IF_ERROR(change_to_bloom_filter());
+ }
+ } else {
+ if (other_filter_type == RuntimeFilterType::IN_FILTER) {
+ // case2: insert data to global filter
+
wrapper->insert_to_bloom_filter(_context->bloom_filter_func.get());
+ } else {
+ // case1&case2: all input bf must has same size
+ RETURN_IF_ERROR(_context->bloom_filter_func->merge(
+ wrapper->_context->bloom_filter_func.get()));
+ }
+ }
+ break;
+ }
+ case RuntimeFilterType::BITMAP_FILTER: {
+ // use input bitmap directly because we assume bitmap filter join
always have full data
+ _context->bitmap_filter_func = wrapper->_context->bitmap_filter_func;
+ break;
+ }
+ default:
+ return Status::InternalError("unknown runtime filter");
+ }
+ return Status::OK();
+}
+
+Status RuntimePredicateWrapper::assign(const PInFilter* in_filter, bool
contain_null) {
+ _context->hybrid_set.reset(create_set(_column_return_type));
+ if (contain_null) {
+ _context->hybrid_set->set_null_aware(true);
+ _context->hybrid_set->insert((const void*)nullptr);
+ }
+
+ switch (_column_return_type) {
+ case TYPE_BOOLEAN: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ bool bool_val = column.boolval();
+ set->insert(&bool_val);
+ });
+ break;
+ }
+ case TYPE_TINYINT: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto int_val = static_cast<int8_t>(column.intval());
+ set->insert(&int_val);
+ });
+ break;
+ }
+ case TYPE_SMALLINT: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto int_val = static_cast<int16_t>(column.intval());
+ set->insert(&int_val);
+ });
+ break;
+ }
+ case TYPE_INT: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ int32_t int_val = column.intval();
+ set->insert(&int_val);
+ });
+ break;
+ }
+ case TYPE_BIGINT: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ int64_t long_val = column.longval();
+ set->insert(&long_val);
+ });
+ break;
+ }
+ case TYPE_LARGEINT: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto string_val = column.stringval();
+ StringParser::ParseResult result;
+ auto int128_val =
StringParser::string_to_int<int128_t>(string_val.c_str(),
+
string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ set->insert(&int128_val);
+ });
+ break;
+ }
+ case TYPE_FLOAT: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto float_val = static_cast<float>(column.doubleval());
+ set->insert(&float_val);
+ });
+ break;
+ }
+ case TYPE_DOUBLE: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ double double_val = column.doubleval();
+ set->insert(&double_val);
+ });
+ break;
+ }
+ case TYPE_DATEV2: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto date_v2_val = column.intval();
+ set->insert(&date_v2_val);
+ });
+ break;
+ }
+ case TYPE_DATETIMEV2: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto date_v2_val = column.longval();
+ set->insert(&date_v2_val);
+ });
+ break;
+ }
+ case TYPE_DATETIME:
+ case TYPE_DATE: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ const auto& string_val_ref = column.stringval();
+ VecDateTimeValue datetime_val;
+ datetime_val.from_date_str(string_val_ref.c_str(),
string_val_ref.length());
+ set->insert(&datetime_val);
+ });
+ break;
+ }
+ case TYPE_DECIMALV2: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ const auto& string_val_ref = column.stringval();
+ DecimalV2Value decimal_val(string_val_ref);
+ set->insert(&decimal_val);
+ });
+ break;
+ }
+ case TYPE_DECIMAL32: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ int32_t decimal_32_val = column.intval();
+ set->insert(&decimal_32_val);
+ });
+ break;
+ }
+ case TYPE_DECIMAL64: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ int64_t decimal_64_val = column.longval();
+ set->insert(&decimal_64_val);
+ });
+ break;
+ }
+ case TYPE_DECIMAL128I: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto string_val = column.stringval();
+ StringParser::ParseResult result;
+ auto int128_val =
StringParser::string_to_int<int128_t>(string_val.c_str(),
+
string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ set->insert(&int128_val);
+ });
+ break;
+ }
+ case TYPE_DECIMAL256: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto string_val = column.stringval();
+ StringParser::ParseResult result;
+ auto int_val =
StringParser::string_to_int<wide::Int256>(string_val.c_str(),
+
string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ set->insert(&int_val);
+ });
+ break;
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ const std::string& string_value = column.stringval();
+ // string_value is std::string, call insert(data, size) function
in StringSet will not cast as StringRef
+ // so could avoid some cast error at different class object.
+ set->insert((void*)string_value.data(), string_value.size());
+ });
+ break;
+ }
+ case TYPE_IPV4: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ int32_t tmp = column.intval();
+ set->insert(&tmp);
+ });
+ break;
+ }
+ case TYPE_IPV6: {
+ batch_assign(in_filter, [](std::shared_ptr<HybridSetBase>& set,
PColumnValue& column) {
+ auto string_val = column.stringval();
+ StringParser::ParseResult result;
+ auto int128_val =
StringParser::string_to_int<uint128_t>(string_val.c_str(),
+
string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ set->insert(&int128_val);
+ });
+ break;
+ }
+ default: {
+ return Status::InternalError("not support assign to in filter, type: "
+
+ type_to_string(_column_return_type));
+ }
+ }
+ return Status::OK();
+}
+
+void RuntimePredicateWrapper::set_enable_fixed_len_to_uint32_v2() {
+ if (_context->bloom_filter_func) {
+ _context->bloom_filter_func->set_enable_fixed_len_to_uint32_v2();
+ }
+}
+
+Status RuntimePredicateWrapper::assign(const PBloomFilter* bloom_filter,
+ butil::IOBufAsZeroCopyInputStream*
data, bool contain_null) {
+ // we won't use this class to insert or find any data
+ // so any type is ok
+ _context->bloom_filter_func.reset(create_bloom_filter(
+ _column_return_type == INVALID_TYPE ? PrimitiveType::TYPE_INT :
_column_return_type));
+ RETURN_IF_ERROR(
+ _context->bloom_filter_func->assign(data,
bloom_filter->filter_length(), contain_null));
+ return Status::OK();
+}
+
+// used by shuffle runtime filter
+// assign this filter by protobuf
+Status RuntimePredicateWrapper::assign(const PMinMaxFilter* minmax_filter,
bool contain_null) {
+ _context->minmax_func.reset(create_minmax_filter(_column_return_type));
+
+ if (contain_null) {
+ _context->minmax_func->set_null_aware(true);
+ _context->minmax_func->set_contain_null();
+ }
+
+ switch (_column_return_type) {
+ case TYPE_BOOLEAN: {
+ bool min_val = minmax_filter->min_val().boolval();
+ bool max_val = minmax_filter->max_val().boolval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_TINYINT: {
+ auto min_val = static_cast<int8_t>(minmax_filter->min_val().intval());
+ auto max_val = static_cast<int8_t>(minmax_filter->max_val().intval());
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_SMALLINT: {
+ auto min_val = static_cast<int16_t>(minmax_filter->min_val().intval());
+ auto max_val = static_cast<int16_t>(minmax_filter->max_val().intval());
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_INT: {
+ int32_t min_val = minmax_filter->min_val().intval();
+ int32_t max_val = minmax_filter->max_val().intval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_BIGINT: {
+ int64_t min_val = minmax_filter->min_val().longval();
+ int64_t max_val = minmax_filter->max_val().longval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_LARGEINT: {
+ auto min_string_val = minmax_filter->min_val().stringval();
+ auto max_string_val = minmax_filter->max_val().stringval();
+ StringParser::ParseResult result;
+ auto min_val =
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
+
min_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ auto max_val =
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
+
max_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_FLOAT: {
+ auto min_val =
static_cast<float>(minmax_filter->min_val().doubleval());
+ auto max_val =
static_cast<float>(minmax_filter->max_val().doubleval());
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DOUBLE: {
+ auto min_val =
static_cast<double>(minmax_filter->min_val().doubleval());
+ auto max_val =
static_cast<double>(minmax_filter->max_val().doubleval());
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DATEV2: {
+ int32_t min_val = minmax_filter->min_val().intval();
+ int32_t max_val = minmax_filter->max_val().intval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DATETIMEV2: {
+ int64_t min_val = minmax_filter->min_val().longval();
+ int64_t max_val = minmax_filter->max_val().longval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DATETIME:
+ case TYPE_DATE: {
+ const auto& min_val_ref = minmax_filter->min_val().stringval();
+ const auto& max_val_ref = minmax_filter->max_val().stringval();
+ VecDateTimeValue min_val;
+ VecDateTimeValue max_val;
+ min_val.from_date_str(min_val_ref.c_str(), min_val_ref.length());
+ max_val.from_date_str(max_val_ref.c_str(), max_val_ref.length());
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DECIMALV2: {
+ const auto& min_val_ref = minmax_filter->min_val().stringval();
+ const auto& max_val_ref = minmax_filter->max_val().stringval();
+ DecimalV2Value min_val(min_val_ref);
+ DecimalV2Value max_val(max_val_ref);
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DECIMAL32: {
+ int32_t min_val = minmax_filter->min_val().intval();
+ int32_t max_val = minmax_filter->max_val().intval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DECIMAL64: {
+ int64_t min_val = minmax_filter->min_val().longval();
+ int64_t max_val = minmax_filter->max_val().longval();
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DECIMAL128I: {
+ auto min_string_val = minmax_filter->min_val().stringval();
+ auto max_string_val = minmax_filter->max_val().stringval();
+ StringParser::ParseResult result;
+ auto min_val =
StringParser::string_to_int<int128_t>(min_string_val.c_str(),
+
min_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ auto max_val =
StringParser::string_to_int<int128_t>(max_string_val.c_str(),
+
max_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_DECIMAL256: {
+ auto min_string_val = minmax_filter->min_val().stringval();
+ auto max_string_val = minmax_filter->max_val().stringval();
+ StringParser::ParseResult result;
+ auto min_val =
StringParser::string_to_int<wide::Int256>(min_string_val.c_str(),
+
min_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ auto max_val =
StringParser::string_to_int<wide::Int256>(max_string_val.c_str(),
+
max_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ case TYPE_VARCHAR:
+ case TYPE_CHAR:
+ case TYPE_STRING: {
+ auto min_val_ref = minmax_filter->min_val().stringval();
+ auto max_val_ref = minmax_filter->max_val().stringval();
+ return _context->minmax_func->assign(&min_val_ref, &max_val_ref);
+ }
+ case TYPE_IPV4: {
+ int tmp_min = minmax_filter->min_val().intval();
+ int tmp_max = minmax_filter->max_val().intval();
+ return _context->minmax_func->assign(&tmp_min, &tmp_max);
+ }
+ case TYPE_IPV6: {
+ auto min_string_val = minmax_filter->min_val().stringval();
+ auto max_string_val = minmax_filter->max_val().stringval();
+ StringParser::ParseResult result;
+ auto min_val =
StringParser::string_to_int<uint128_t>(min_string_val.c_str(),
+
min_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ auto max_val =
StringParser::string_to_int<uint128_t>(max_string_val.c_str(),
+
max_string_val.length(), &result);
+ DCHECK(result == StringParser::PARSE_SUCCESS);
+ return _context->minmax_func->assign(&min_val, &max_val);
+ }
+ default:
+ break;
+ }
+ return Status::InternalError("not support!");
+}
+
+void RuntimePredicateWrapper::get_bloom_filter_desc(char** data, int*
filter_length) {
+ _context->bloom_filter_func->get_data(data, filter_length);
+}
+
+bool RuntimePredicateWrapper::contain_null() const {
+ if (is_bloomfilter()) {
+ return _context->bloom_filter_func->contain_null();
+ }
+ if (_context->hybrid_set) {
+ if (get_real_type() != RuntimeFilterType::IN_FILTER) {
+ throw Exception(ErrorCode::INTERNAL_ERROR, "rf has hybrid_set but
real type is {}",
+ int(get_real_type()));
+ }
+ return _context->hybrid_set->contain_null();
+ }
+ if (_context->minmax_func) {
+ return _context->minmax_func->contain_null();
+ }
+ return false;
+}
+
+size_t RuntimePredicateWrapper::get_in_filter_size() const {
+ return _context->hybrid_set ? _context->hybrid_set->size() : 0;
+}
+
+void RuntimePredicateWrapper::set_disabled() {
+ _context->disabled = true;
+ _context->minmax_func.reset();
+ _context->hybrid_set.reset();
+ _context->bloom_filter_func.reset();
+ _context->bitmap_filter_func.reset();
+}
+
} // namespace doris
diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h
index 441de7d4da3..e38f6901ef4 100644
--- a/be/src/exprs/runtime_filter.h
+++ b/be/src/exprs/runtime_filter.h
@@ -443,4 +443,115 @@ private:
WrapperPtr _wrapper;
};
+// This class is a wrapper of runtime predicate function
+class RuntimePredicateWrapper {
+public:
+ RuntimePredicateWrapper(const RuntimeFilterParams* params)
+ : RuntimePredicateWrapper(params->column_return_type,
params->filter_type,
+ params->filter_id) {};
+ // for a 'tmp' runtime predicate wrapper
+ // only could called assign method or as a param for merge
+ RuntimePredicateWrapper(PrimitiveType column_type, RuntimeFilterType type,
uint32_t filter_id)
+ : _column_return_type(column_type),
+ _filter_type(type),
+ _context(new RuntimeFilterContext()),
+ _filter_id(filter_id) {}
+
+ // init runtime filter wrapper
+ // alloc memory to init runtime filter function
+ Status init(const RuntimeFilterParams* params);
+
+ Status change_to_bloom_filter();
+
+ Status init_bloom_filter(const size_t build_bf_cardinality);
+
+ bool get_build_bf_cardinality() const;
+
+ void insert_to_bloom_filter(BloomFilterFuncBase* bloom_filter) const;
+
+ BloomFilterFuncBase* get_bloomfilter() const { return
_context->bloom_filter_func.get(); }
+
+ void insert_fixed_len(const vectorized::ColumnPtr& column, size_t start);
+
+ void insert_batch(const vectorized::ColumnPtr& column, size_t start) {
+ if (get_real_type() == RuntimeFilterType::BITMAP_FILTER) {
+ bitmap_filter_insert_batch(column, start);
+ } else {
+ insert_fixed_len(column, start);
+ }
+ }
+
+ void bitmap_filter_insert_batch(const vectorized::ColumnPtr column, size_t
start);
+
+ RuntimeFilterType get_real_type() const {
+ if (_filter_type == RuntimeFilterType::IN_OR_BLOOM_FILTER) {
+ if (_context->hybrid_set) {
+ return RuntimeFilterType::IN_FILTER;
+ }
+ return RuntimeFilterType::BLOOM_FILTER;
+ }
+ return _filter_type;
+ }
+
+ size_t get_bloom_filter_size() const;
+
+ Status get_push_exprs(std::list<vectorized::VExprContextSPtr>& probe_ctxs,
+ std::vector<vectorized::VRuntimeFilterPtr>&
push_exprs,
+ const TExpr& probe_expr);
+
+ Status merge(const RuntimePredicateWrapper* wrapper);
+
+ Status assign(const PInFilter* in_filter, bool contain_null);
+
+ void set_enable_fixed_len_to_uint32_v2();
+
+ // used by shuffle runtime filter
+ // assign this filter by protobuf
+ Status assign(const PBloomFilter* bloom_filter,
butil::IOBufAsZeroCopyInputStream* data,
+ bool contain_null);
+
+ // used by shuffle runtime filter
+ // assign this filter by protobuf
+ Status assign(const PMinMaxFilter* minmax_filter, bool contain_null);
+
+ void get_bloom_filter_desc(char** data, int* filter_length);
+
+ PrimitiveType column_type() { return _column_return_type; }
+
+ bool is_bloomfilter() const { return get_real_type() ==
RuntimeFilterType::BLOOM_FILTER; }
+
+ bool contain_null() const;
+
+ bool is_ignored() const { return _context->ignored; }
+
+ void set_ignored() { _context->ignored = true; }
+
+ bool is_disabled() const { return _context->disabled; }
+
+ void set_disabled();
+
+ void batch_assign(const PInFilter* filter,
+ void (*assign_func)(std::shared_ptr<HybridSetBase>&
_hybrid_set,
+ PColumnValue&));
+
+ size_t get_in_filter_size() const;
+
+ std::shared_ptr<BitmapFilterFuncBase> get_bitmap_filter() const {
+ return _context->bitmap_filter_func;
+ }
+
+ friend class IRuntimeFilter;
+
+ void set_filter_id(int id);
+
+private:
+ // When a runtime filter received from remote and it is a bloom filter,
_column_return_type will be invalid.
+ PrimitiveType _column_return_type; // column type
+ RuntimeFilterType _filter_type;
+ int32_t _max_in_num = -1;
+
+ RuntimeFilterContextSPtr _context;
+ uint32_t _filter_id;
+};
+
} // namespace doris
diff --git a/be/src/exprs/runtime_filter_slots.h
b/be/src/exprs/runtime_filter_slots.h
index a9dd631e358..b732ae155f7 100644
--- a/be/src/exprs/runtime_filter_slots.h
+++ b/be/src/exprs/runtime_filter_slots.h
@@ -35,11 +35,7 @@ public:
VRuntimeFilterSlots(
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
build_expr_ctxs,
const std::vector<std::shared_ptr<IRuntimeFilter>>&
runtime_filters)
- : _build_expr_context(build_expr_ctxs),
_runtime_filters(runtime_filters) {
- for (auto runtime_filter : _runtime_filters) {
-
_runtime_filters_map[runtime_filter->expr_order()].push_back(runtime_filter.get());
- }
- }
+ : _build_expr_context(build_expr_ctxs),
_runtime_filters(runtime_filters) {}
Status send_filter_size(RuntimeState* state, uint64_t hash_table_size,
std::shared_ptr<pipeline::CountedFinishDependency>
dependency) {
@@ -139,62 +135,48 @@ public:
}
void insert(const vectorized::Block* block) {
- for (int i = 0; i < _build_expr_context.size(); ++i) {
- auto iter = _runtime_filters_map.find(i);
- if (iter == _runtime_filters_map.end()) {
- continue;
- }
-
- int result_column_id =
_build_expr_context[i]->get_last_result_column_id();
+ for (auto& filter : _runtime_filters) {
+ int result_column_id =
+
_build_expr_context[filter->expr_order()]->get_last_result_column_id();
const auto& column =
block->get_by_position(result_column_id).column;
- for (auto* filter : iter->second) {
- if (filter->get_ignored() || filter->get_disabled()) {
- continue;
- }
- filter->insert_batch(column, 1);
+ if (filter->get_ignored() || filter->get_disabled()) {
+ continue;
}
+ filter->insert_batch(column, 1);
}
}
// publish runtime filter
Status publish(RuntimeState* state, bool publish_local) {
- for (auto& pair : _runtime_filters_map) {
- for (auto& filter : pair.second) {
- RETURN_IF_ERROR(filter->publish(state, publish_local));
- }
+ for (auto& filter : _runtime_filters) {
+ RETURN_IF_ERROR(filter->publish(state, publish_local));
}
return Status::OK();
}
void copy_to_shared_context(vectorized::SharedHashTableContextPtr&
context) {
- for (auto& it : _runtime_filters_map) {
- for (auto& filter : it.second) {
- context->runtime_filters[filter->filter_id()] =
filter->get_shared_context_ref();
- }
+ for (auto& filter : _runtime_filters) {
+ context->runtime_filters[filter->filter_id()] =
filter->get_shared_context_ref();
}
}
Status copy_from_shared_context(vectorized::SharedHashTableContextPtr&
context) {
- for (auto& it : _runtime_filters_map) {
- for (auto& filter : it.second) {
- auto filter_id = filter->filter_id();
- auto ret = context->runtime_filters.find(filter_id);
- if (ret == context->runtime_filters.end()) {
- return Status::Aborted("invalid runtime filter id: {}",
filter_id);
- }
- filter->get_shared_context_ref() = ret->second;
+ for (auto& filter : _runtime_filters) {
+ auto filter_id = filter->filter_id();
+ auto ret = context->runtime_filters.find(filter_id);
+ if (ret == context->runtime_filters.end()) {
+ return Status::Aborted("invalid runtime filter id: {}",
filter_id);
}
+ filter->get_shared_context_ref() = ret->second;
}
return Status::OK();
}
- bool empty() { return _runtime_filters_map.empty(); }
+ bool empty() { return _runtime_filters.empty(); }
private:
const std::vector<std::shared_ptr<vectorized::VExprContext>>&
_build_expr_context;
std::vector<std::shared_ptr<IRuntimeFilter>> _runtime_filters;
- // prob_contition index -> [IRuntimeFilter]
- std::map<int, std::list<IRuntimeFilter*>> _runtime_filters_map;
};
} // namespace doris
diff --git a/be/src/pipeline/exec/operator.cpp
b/be/src/pipeline/exec/operator.cpp
index bb254aae72b..9ca0f0fcd40 100644
--- a/be/src/pipeline/exec/operator.cpp
+++ b/be/src/pipeline/exec/operator.cpp
@@ -295,7 +295,7 @@ Status OperatorXBase::do_projections(RuntimeState* state,
vectorized::Block* ori
*_output_row_descriptor);
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
- DCHECK(mutable_columns.size() == local_state->_projections.size());
+ DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) <<
debug_string();
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block,
&result_column_id));
diff --git a/be/src/pipeline/pipeline_fragment_context.cpp
b/be/src/pipeline/pipeline_fragment_context.cpp
index e8e8ed5d9fe..c8e3429107c 100644
--- a/be/src/pipeline/pipeline_fragment_context.cpp
+++ b/be/src/pipeline/pipeline_fragment_context.cpp
@@ -363,6 +363,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFrag
const auto target_size = request.local_params.size();
_tasks.resize(target_size);
_runtime_filter_states.resize(target_size);
+ _runtime_filter_mgr_map.resize(target_size);
_task_runtime_states.resize(_pipelines.size());
for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) {
_task_runtime_states[pip_idx].resize(_pipelines[pip_idx]->num_tasks());
@@ -510,7 +511,7 @@ Status PipelineFragmentContext::_build_pipeline_tasks(const
doris::TPipelineFrag
}
{
std::lock_guard<std::mutex> l(_state_map_lock);
- _runtime_filter_mgr_map[fragment_instance_id] =
std::move(runtime_filter_mgr);
+ _runtime_filter_mgr_map[i] = std::move(runtime_filter_mgr);
}
return Status::OK();
};
diff --git a/be/src/pipeline/pipeline_fragment_context.h
b/be/src/pipeline/pipeline_fragment_context.h
index d672ad6e923..bd3a350d0a2 100644
--- a/be/src/pipeline/pipeline_fragment_context.h
+++ b/be/src/pipeline/pipeline_fragment_context.h
@@ -275,8 +275,7 @@ private:
_op_id_to_le_state;
std::map<PipelineId, Pipeline*> _pip_id_to_pipeline;
- // UniqueId -> runtime mgr
- std::map<UniqueId, std::unique_ptr<RuntimeFilterMgr>>
_runtime_filter_mgr_map;
+ std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgr_map;
//Here are two types of runtime states:
// - _runtime state is at the Fragment level.
diff --git a/be/src/pipeline/task_queue.h b/be/src/pipeline/task_queue.h
index 1651eb50cac..2218d70fde6 100644
--- a/be/src/pipeline/task_queue.h
+++ b/be/src/pipeline/task_queue.h
@@ -107,12 +107,15 @@ class MultiCoreTaskQueue {
public:
explicit MultiCoreTaskQueue(int core_size);
+#ifndef BE_TEST
~MultiCoreTaskQueue();
-
- void close();
-
// Get the task by core id.
PipelineTask* take(int core_id);
+#else
+ virtual ~MultiCoreTaskQueue();
+ virtual PipelineTask* take(int core_id);
+#endif
+ void close();
// TODO combine these methods to `push_back(task, core_id = -1)`
Status push_back(PipelineTask* task);
diff --git a/be/test/pipeline/dummy_task_queue.h
b/be/test/pipeline/dummy_task_queue.h
new file mode 100644
index 00000000000..bace0ecae96
--- /dev/null
+++ b/be/test/pipeline/dummy_task_queue.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "pipeline/task_queue.h"
+
+namespace doris::pipeline {
+
+class DummyTaskQueue final : public MultiCoreTaskQueue {
+ explicit DummyTaskQueue(int core_size) : MultiCoreTaskQueue(core_size) {}
+ ~DummyTaskQueue() override = default;
+ PipelineTask* take(int core_id) override {
+ PipelineTask* task = nullptr;
+ do {
+ DCHECK(_prio_task_queues.size() > core_id)
+ << " list size: " << _prio_task_queues.size() << "
core_id: " << core_id
+ << " _core_size: " << _core_size << " _next_core: " <<
_next_core.load();
+ task = _prio_task_queues[core_id].try_take(false);
+ if (task) {
+ break;
+ }
+ task = _steal_take(core_id);
+ if (task) {
+ break;
+ }
+ task = _prio_task_queues[core_id].take(1);
+ if (task) {
+ break;
+ }
+ } while (false);
+ if (task) {
+ task->pop_out_runnable_queue();
+ }
+ return task;
+ }
+};
+} // namespace doris::pipeline
diff --git a/be/test/pipeline/pipeline_test.cpp
b/be/test/pipeline/pipeline_test.cpp
index d1c0af85f58..6466d6c2927 100644
--- a/be/test/pipeline/pipeline_test.cpp
+++ b/be/test/pipeline/pipeline_test.cpp
@@ -22,12 +22,15 @@
#include "common/exception.h"
#include "common/status.h"
+#include "dummy_task_queue.h"
+#include "exprs/bloom_filter_func.h"
+#include "exprs/hybrid_set.h"
+#include "exprs/runtime_filter.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/pipeline_fragment_context.h"
-#include "pipeline/task_queue.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "thrift_builder.h"
@@ -45,10 +48,13 @@ class PipelineTest : public testing::Test {
public:
PipelineTest()
: _obj_pool(new ObjectPool()),
- _mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) {
+ _mgr(std::make_unique<doris::vectorized::VDataStreamMgr>()) {}
+ ~PipelineTest() override = default;
+ void SetUp() override {
_query_options = TQueryOptionsBuilder()
.set_enable_local_exchange(true)
.set_enable_local_shuffle(true)
+ .set_runtime_filter_max_in_num(15)
.build();
auto fe_address = TNetworkAddress();
fe_address.hostname = LOCALHOST;
@@ -56,10 +62,12 @@ public:
_query_ctx = QueryContext::create_shared(_query_id,
ExecEnv::GetInstance(), _query_options,
fe_address, true, fe_address,
QuerySource::INTERNAL_FRONTEND);
+ _query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
+ TRuntimeFilterParamsBuilder().build());
ExecEnv::GetInstance()->set_stream_mgr(_mgr.get());
- _task_queue = std::make_unique<MultiCoreTaskQueue>(1);
+ _task_queue = std::make_unique<DummyTaskQueue>(1);
}
- ~PipelineTest() override = default;
+ void TearDown() override {}
private:
std::shared_ptr<Pipeline> _build_pipeline(int num_instances, Pipeline*
parent = nullptr) {
@@ -111,6 +119,7 @@ private:
_pipeline_profiles.clear();
_pipeline_tasks.clear();
_runtime_states.clear();
+ _runtime_filter_mgrs.clear();
}
int _next_fragment_id() { return _fragment_id++; }
int _next_node_id() { return _next_node_idx++; }
@@ -125,7 +134,7 @@ private:
std::shared_ptr<QueryContext> _query_ctx;
TUniqueId _query_id = TUniqueId();
TQueryOptions _query_options;
- std::unique_ptr<MultiCoreTaskQueue> _task_queue;
+ std::unique_ptr<DummyTaskQueue> _task_queue;
// Fragment level
// Fragment0 -> Fragment1
@@ -149,6 +158,9 @@ private:
std::vector<std::vector<std::unique_ptr<PipelineTask>>> _pipeline_tasks;
std::vector<std::vector<std::unique_ptr<RuntimeState>>> _runtime_states;
+ // Instance level
+ std::vector<std::unique_ptr<RuntimeFilterMgr>> _runtime_filter_mgrs;
+
const std::string LOCALHOST = BackendOptions::get_localhost();
const int DUMMY_PORT = config::brpc_port;
};
@@ -184,7 +196,7 @@ TEST_F(PipelineTest, HAPPY_PATH) {
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
- .set_nullIndicatorBit(0)
+ .set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_isMaterialized(true)
@@ -489,7 +501,7 @@ TEST_F(PipelineTest, PLAN_LOCAL_EXCHANGE) {
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
- .set_nullIndicatorBit(0)
+ .set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_isMaterialized(true)
@@ -585,7 +597,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
- .set_nullIndicatorBit(0)
+ .set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_isMaterialized(true)
@@ -604,7 +616,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
- .set_nullIndicatorBit(0)
+ .set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_isMaterialized(true)
@@ -623,7 +635,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
- .set_nullIndicatorBit(0)
+ .set_nullIndicatorBit(-1)
.set_byteOffset(0)
.set_slotIdx(0)
.set_isMaterialized(true)
@@ -640,7 +652,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
.set_scalar_type(TPrimitiveType::INT)
.build())
.build())
- .set_nullIndicatorBit(0)
+ .set_nullIndicatorBit(-1)
.set_byteOffset(4)
.set_slotIdx(1)
.set_isMaterialized(true)
@@ -720,6 +732,73 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
.append_vintermediate_tuple_id_list(1)
.build())
.append_row_tuples(2, false)
+ .append_projections(
+ TExprBuilder()
+ .append_nodes(
+ TExprNodeBuilder(
+ TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(
+
TTypeNodeBuilder()
+
.set_type(
+
TTypeNodeType::
+
SCALAR)
+
.set_scalar_type(
+
TPrimitiveType::
+
INT)
+
.build())
+ .build(),
+ 0)
+
.set_slot_ref(TSlotRefBuilder(0, 0).build())
+ .build())
+ .build())
+ .append_projections(
+ TExprBuilder()
+ .append_nodes(
+ TExprNodeBuilder(
+ TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(
+
TTypeNodeBuilder()
+
.set_type(
+
TTypeNodeType::
+
SCALAR)
+
.set_scalar_type(
+
TPrimitiveType::
+
INT)
+
.build())
+ .build(),
+ 0)
+
.set_slot_ref(TSlotRefBuilder(1, 1).build())
+ .build())
+ .build())
+ .append_runtime_filters(
+ TRuntimeFilterDescBuilder(
+ 0,
+ TExprBuilder()
+ .append_nodes(
+ TExprNodeBuilder(
+
TExprNodeType::SLOT_REF,
+ TTypeDescBuilder()
+ .set_types(
+
TTypeNodeBuilder()
+
.set_type(
+
TTypeNodeType::
+
SCALAR)
+
.set_scalar_type(
+
TPrimitiveType::
+
INT)
+
.build())
+ .build(),
+ 0)
+ .set_slot_ref(
+
TSlotRefBuilder(1, 1).build())
+ .build())
+ .build(),
+ 0, std::map<TPlanNodeId, TExpr> {})
+ .set_bloom_filter_size_bytes(1048576)
+ .set_build_bf_exactly(false)
+ .build())
.build();
{
@@ -850,6 +929,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
{
// Build pipeline task
int task_id = 0;
+ _runtime_filter_mgrs.resize(parallelism);
+ for (int j = 0; j < parallelism; j++) {
+ auto runtime_filter_state =
RuntimeFilterParamsContext::create(_query_ctx.get());
+ _runtime_filter_mgrs[j] = std::make_unique<RuntimeFilterMgr>(
+ _query_id, runtime_filter_state,
_query_ctx->query_mem_tracker, false);
+ }
for (size_t i = 0; i < _pipelines.size(); i++) {
EXPECT_EQ(_pipelines[i]->id(), i);
_pipeline_profiles[_pipelines[i]->id()] =
std::make_shared<RuntimeProfile>(
@@ -871,6 +956,8 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
local_runtime_state->set_task_num(_pipelines[i]->num_tasks());
local_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(_context.back()));
+
local_runtime_state->set_runtime_filter_mgr(_runtime_filter_mgrs[j].get());
+
_runtime_filter_mgrs[j]->_state->set_state(local_runtime_state.get());
std::map<int,
std::pair<std::shared_ptr<LocalExchangeSharedState>,
std::shared_ptr<Dependency>>>
le_state_map;
@@ -891,6 +978,7 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
}
std::shared_ptr<vectorized::VDataStreamRecvr> downstream_recvr;
+ auto downstream_pipeline_profile =
std::make_shared<RuntimeProfile>("Downstream Pipeline");
{
// Build downstream recvr
auto context = _build_fragment_context();
@@ -900,13 +988,12 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
downstream_runtime_state->set_task_execution_context(
std::static_pointer_cast<TaskExecutionContext>(context));
- auto downstream_pipeline_profile =
std::make_shared<RuntimeProfile>("Downstream Pipeline");
auto* memory_used_counter =
downstream_pipeline_profile->AddHighWaterMarkCounter(
"MemoryUsage", TUnit::BYTES, "", 1);
downstream_recvr = ExecEnv::GetInstance()->_vstream_mgr->create_recvr(
downstream_runtime_state.get(), memory_used_counter,
_pipelines.front()->operators().back()->row_desc(),
dest_ins_id, dest_node_id,
- parallelism, downstream_pipeline_profile.get(), false, 20480);
+ parallelism, downstream_pipeline_profile.get(), false,
2048000);
}
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
@@ -914,21 +1001,16 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
EXPECT_EQ(_pipeline_tasks[_pipelines[i]->id()][j]->prepare(scan_ranges, j,
tsink,
_query_ctx.get()),
Status::OK());
+ if (i == 1) {
+ auto& local_state = _runtime_states[i][j]
+ ->get_sink_local_state()
+
->cast<HashJoinBuildSinkLocalState>();
+ EXPECT_EQ(local_state._runtime_filters.size(), 1);
+ EXPECT_EQ(local_state._should_build_hash_table, true);
+ }
}
}
- // Construct input block
- vectorized::Block block;
- {
- vectorized::DataTypePtr int_type =
std::make_shared<vectorized::DataTypeInt32>();
-
- auto int_col0 = vectorized::ColumnInt32::create();
- int_col0->insert_many_vals(1, 10);
- block.insert({std::move(int_col0), int_type, "test_int_col0"});
- }
- auto block_mem_usage = block.allocated_bytes();
- EXPECT_GT(block_mem_usage - 1, 0);
-
{
for (size_t i = 0; i < _pipelines.size(); i++) {
for (int j = 0; j < parallelism; j++) {
@@ -960,24 +1042,146 @@ TEST_F(PipelineTest, PLAN_HASH_JOIN) {
{
for (int i = _pipelines.size() - 1; i >= 0; i--) {
for (int j = 0; j < parallelism; j++) {
+ bool eos = false;
+ EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK());
+ EXPECT_EQ(_pipeline_tasks[i][j]->_opened, true);
+ EXPECT_EQ(eos, false);
+ }
+ }
+ }
+ for (int i = _pipelines.size() - 1; i >= 0; i--) {
+ for (int j = 0; j < parallelism; j++) {
+ {
+ vectorized::Block block;
+ {
+ vectorized::DataTypePtr int_type =
+ std::make_shared<vectorized::DataTypeInt32>();
+
+ auto int_col0 = vectorized::ColumnInt32::create();
+ if (j == 0 || i == 0) {
+ int_col0->insert_many_vals(j, 10);
+ } else {
+ size_t ndv = 16;
+ for (size_t n = 0; n < ndv; n++) {
+ int_col0->insert_many_vals(n, 1);
+ }
+ }
+
+ block.insert({std::move(int_col0), int_type,
"test_int_col0"});
+ }
auto& local_state =
_runtime_states[i][j]
->get_local_state(_pipelines[i]->operators().front()->operator_id())
->cast<ExchangeLocalState>();
-
local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
-
- bool eos = false;
- EXPECT_EQ(_pipeline_tasks[i][j]->execute(&eos), Status::OK());
- EXPECT_EQ(_pipeline_tasks[i][j]->_is_blocked(), false);
- EXPECT_EQ(eos, true);
- EXPECT_EQ(_pipeline_tasks[i][j]->is_pending_finish(), false);
- EXPECT_EQ(_pipeline_tasks[i][j]->close(Status::OK()),
Status::OK());
+
EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]->_source_dependency->ready(),
+ false);
+ EXPECT_EQ(local_state.stream_recvr->_sender_queues[0]
+ ->_source_dependency->_blocked_task.size(),
+ i == 1 ? 1 : 0);
+ local_state.stream_recvr->_sender_queues[0]->add_block(&block,
true);
+ }
+ }
+ }
+ {
+ // Pipeline 1 is blocked by exchange dependency so tasks are ready
after data reached.
+ // Pipeline 0 is blocked by hash join dependency and is still waiting
for upstream tasks done.
+ for (int j = 0; j < parallelism; j++) {
+ // Task is ready and be push into runnable task queue.
+ EXPECT_EQ(_task_queue->take(0) != nullptr, true);
+ }
+ EXPECT_EQ(_task_queue->take(0), nullptr);
+ for (int j = 0; j < parallelism; j++) {
+ EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
+ }
+ }
+ {
+ // Pipeline 1 ran first and build hash table in join build operator.
+ for (int j = 0; j < parallelism; j++) {
+ bool eos = false;
+ EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK());
+ EXPECT_EQ(eos, false);
+ }
+ for (int j = 0; j < parallelism; j++) {
+ auto& local_state =
+ _runtime_states[1][j]
+
->get_local_state(_pipelines[1]->operators().front()->operator_id())
+ ->cast<ExchangeLocalState>();
+ local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
+
+ bool eos = false;
+ EXPECT_EQ(_pipeline_tasks[1][j]->execute(&eos), Status::OK());
+ EXPECT_EQ(_pipeline_tasks[1][j]->_is_blocked(), false);
+ EXPECT_EQ(eos, true);
+ auto& sink_local_state = _runtime_states[1][j]
+ ->get_sink_local_state()
+
->cast<HashJoinBuildSinkLocalState>();
+ EXPECT_EQ(sink_local_state._runtime_filters_disabled, false);
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters.size(), 1);
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+ ->need_sync_filter_size(),
+ false);
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+ ->_runtime_filter_type,
+ RuntimeFilterType::IN_OR_BLOOM_FILTER);
+ EXPECT_EQ(_pipeline_tasks[1][j]->is_pending_finish(), false);
+ EXPECT_EQ(_pipeline_tasks[1][j]->close(Status::OK()),
Status::OK());
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]->get_real_type(),
+ j == 0 ? RuntimeFilterType::IN_FILTER :
RuntimeFilterType::BLOOM_FILTER)
+ << " " << j << " "
+ << IRuntimeFilter::to_string(
+
sink_local_state._runtime_filter_slots->_runtime_filters[0]
+ ->get_real_type());
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+ ->_wrapper->is_ignored(),
+ false);
+ if (j == 0) {
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+ ->_wrapper->_context->hybrid_set->size(),
+ 1);
+ } else {
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+
->_wrapper->_context->bloom_filter_func->_build_bf_exactly,
+ false);
+
+
EXPECT_EQ(sink_local_state._runtime_filter_slots->_runtime_filters[0]
+
->_wrapper->_context->bloom_filter_func->_bloom_filter_length,
+ 1048576);
}
}
- {
-
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 0);
-
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders, 0);
+ }
+ {
+ // Pipeline 0 ran once hash table is built.
+ for (int j = 0; j < parallelism; j++) {
+ EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false);
}
+ for (int j = 0; j < parallelism; j++) {
+ bool eos = false;
+ EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK());
+ EXPECT_EQ(eos, false);
+ }
+ for (int j = 0; j < parallelism; j++) {
+ auto& local_state =
+ _runtime_states[0][j]
+
->get_local_state(_pipelines[0]->operators().front()->operator_id())
+ ->cast<ExchangeLocalState>();
+ local_state.stream_recvr->_sender_queues[0]->decrement_senders(0);
+
+ bool eos = false;
+ EXPECT_EQ(_pipeline_tasks[0][j]->execute(&eos), Status::OK());
+ EXPECT_EQ(_pipeline_tasks[0][j]->_is_blocked(), false);
+ EXPECT_EQ(eos, true);
+ EXPECT_EQ(_pipeline_tasks[0][j]->is_pending_finish(), false);
+ EXPECT_EQ(_pipeline_tasks[0][j]->close(Status::OK()),
Status::OK());
+ }
+ }
+ {
+ // [1, 1, 1, 1, 1, 1, 1, 1, 1, 1] join [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
produces 100 rows in instance 0.
+ // [2, 2, 2, 2, 2, 2, 2, 2, 2, 2] join [2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
produces 100 rows in instance 1.
+ EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.size(), 2);
+
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.front()._block->rows(),
+ 10 * 10);
+
EXPECT_EQ(downstream_recvr->_sender_queues[0]->_block_queue.back()._block->rows(),
10);
+ EXPECT_EQ(downstream_recvr->_sender_queues[0]->_num_remaining_senders,
0);
}
downstream_recvr->close();
}
diff --git a/be/test/pipeline/thrift_builder.h
b/be/test/pipeline/thrift_builder.h
index bdcead88616..1af1ca760bd 100644
--- a/be/test/pipeline/thrift_builder.h
+++ b/be/test/pipeline/thrift_builder.h
@@ -58,7 +58,7 @@ public:
return *this;
}
TQueryOptionsBuilder& set_enable_new_shuffle_hash_method(bool
enable_new_shuffle_hash_method) {
- _query_options.enable_new_shuffle_hash_method =
enable_new_shuffle_hash_method;
+
_query_options.__set_enable_new_shuffle_hash_method(enable_new_shuffle_hash_method);
return *this;
}
TQueryOptionsBuilder& set_enable_local_shuffle(bool enable_local_shuffle) {
@@ -66,11 +66,23 @@ public:
return *this;
}
TQueryOptionsBuilder& set_runtime_filter_wait_infinitely(bool
runtime_filter_wait_infinitely) {
- _query_options.runtime_filter_wait_infinitely =
runtime_filter_wait_infinitely;
+
_query_options.__set_runtime_filter_wait_infinitely(runtime_filter_wait_infinitely);
return *this;
}
TQueryOptionsBuilder& set_enable_local_merge_sort(bool
enable_local_merge_sort) {
- _query_options.enable_local_merge_sort = enable_local_merge_sort;
+ _query_options.__set_enable_local_merge_sort(enable_local_merge_sort);
+ return *this;
+ }
+ TQueryOptionsBuilder& set_runtime_filter_max_in_num(int64_t
runtime_filter_max_in_num) {
+
_query_options.__set_runtime_filter_max_in_num(runtime_filter_max_in_num);
+ return *this;
+ }
+ TQueryOptionsBuilder& set_runtime_bloom_filter_min_size(int64_t
runtime_bloom_filter_min_size) {
+
_query_options.__set_runtime_bloom_filter_min_size(runtime_bloom_filter_min_size);
+ return *this;
+ }
+ TQueryOptionsBuilder& set_runtime_bloom_filter_max_size(int64_t
runtime_bloom_filter_max_size) {
+
_query_options.__set_runtime_bloom_filter_max_size(runtime_bloom_filter_max_size);
return *this;
}
@@ -117,6 +129,16 @@ public:
_plan_node.__set_output_tuple_id(output_tuple_id);
return *this;
}
+ TPlanNodeBuilder& append_projections(TExpr& projections) {
+ _plan_node.__isset.projections = true;
+ _plan_node.projections.push_back(projections);
+ return *this;
+ }
+ TPlanNodeBuilder& append_runtime_filters(TRuntimeFilterDesc&
runtime_filter) {
+ _plan_node.__isset.runtime_filters = true;
+ _plan_node.runtime_filters.push_back(runtime_filter);
+ return *this;
+ }
TPlanNode& build() { return _plan_node; }
@@ -127,6 +149,55 @@ private:
TPlanNode _plan_node;
};
+class TRuntimeFilterDescBuilder {
+public:
+ explicit TRuntimeFilterDescBuilder(
+ int filter_id, TExpr& src_expr, int expr_order,
+ std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool
is_broadcast_join = false,
+ bool has_local_targets = true, bool has_remote_targets = false,
+ TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM)
+ : _desc() {
+ _desc.__set_filter_id(filter_id);
+ _desc.__set_src_expr(src_expr);
+ _desc.__set_expr_order(expr_order);
+ _desc.__set_planId_to_target_expr(planId_to_target_expr);
+ _desc.__set_is_broadcast_join(is_broadcast_join);
+ _desc.__set_has_local_targets(has_local_targets);
+ _desc.__set_has_remote_targets(has_remote_targets);
+ _desc.__set_type(type);
+ }
+ explicit TRuntimeFilterDescBuilder(
+ int filter_id, TExpr&& src_expr, int expr_order,
+ std::map<TPlanNodeId, TExpr> planId_to_target_expr, bool
is_broadcast_join = false,
+ bool has_local_targets = true, bool has_remote_targets = false,
+ TRuntimeFilterType::type type = TRuntimeFilterType::IN_OR_BLOOM)
+ : _desc() {
+ _desc.__set_filter_id(filter_id);
+ _desc.__set_src_expr(src_expr);
+ _desc.__set_expr_order(expr_order);
+ _desc.__set_planId_to_target_expr(planId_to_target_expr);
+ _desc.__set_is_broadcast_join(is_broadcast_join);
+ _desc.__set_has_local_targets(has_local_targets);
+ _desc.__set_has_remote_targets(has_remote_targets);
+ _desc.__set_type(type);
+ }
+
+ TRuntimeFilterDescBuilder& set_bloom_filter_size_bytes(int64_t
bloom_filter_size_bytes) {
+ _desc.__set_bloom_filter_size_bytes(bloom_filter_size_bytes);
+ return *this;
+ }
+ TRuntimeFilterDescBuilder& set_build_bf_exactly(bool build_bf_exactly) {
+ _desc.__set_build_bf_exactly(build_bf_exactly);
+ return *this;
+ }
+ TRuntimeFilterDesc& build() { return _desc; }
+ TRuntimeFilterDescBuilder(const TRuntimeFilterDescBuilder&) = delete;
+ void operator=(const TRuntimeFilterDescBuilder&) = delete;
+
+private:
+ TRuntimeFilterDesc _desc;
+};
+
class TExchangeNodeBuilder {
public:
explicit TExchangeNodeBuilder() : _plan_node() {}
@@ -269,7 +340,10 @@ private:
class TTypeDescBuilder {
public:
- explicit TTypeDescBuilder() : _desc() {}
+ explicit TTypeDescBuilder() : _desc() {
+ _desc.__set_result_is_nullable(false);
+ _desc.__set_is_nullable(false);
+ }
TTypeDescBuilder& set_types(TTypeNode type_node) {
_desc.types.push_back(type_node);
@@ -423,6 +497,7 @@ public:
_expr_node.__set_type(type);
_expr_node.__set_num_children(num_children);
_expr_node.__set_opcode(opcode);
+ _expr_node.__set_is_nullable(false);
}
explicit TExprNodeBuilder(TExprNodeType::type node_type, TTypeDesc&& type,
int num_children,
TExprOpcode::type opcode =
TExprOpcode::INVALID_OPCODE)
@@ -477,4 +552,27 @@ private:
TEqJoinCondition _eq_conjuncts;
};
+class TRuntimeFilterParamsBuilder {
+public:
+ explicit TRuntimeFilterParamsBuilder(
+ TNetworkAddress runtime_filter_merge_addr = TNetworkAddress(),
+ std::map<int, std::vector<TRuntimeFilterTargetParams>>
rid_to_target_param = {},
+ std::map<int, TRuntimeFilterDesc> rid_to_runtime_filter = {},
+ std::map<int, int> runtime_filter_builder_num = {},
+ std::map<int, std::vector<TRuntimeFilterTargetParamsV2>>
rid_to_target_paramv2 = {})
+ : _params() {
+ _params.__set_runtime_filter_merge_addr(runtime_filter_merge_addr);
+ _params.__set_rid_to_target_param(rid_to_target_param);
+ _params.__set_rid_to_runtime_filter(rid_to_runtime_filter);
+ _params.__set_runtime_filter_builder_num(runtime_filter_builder_num);
+ _params.__set_rid_to_target_paramv2(rid_to_target_paramv2);
+ }
+ TRuntimeFilterParams& build() { return _params; }
+ TRuntimeFilterParamsBuilder(const TRuntimeFilterParamsBuilder&) = delete;
+ void operator=(const TRuntimeFilterParamsBuilder&) = delete;
+
+private:
+ TRuntimeFilterParams _params;
+};
+
} // namespace doris::pipeline
diff --git a/gensrc/thrift/PaloInternalService.thrift
b/gensrc/thrift/PaloInternalService.thrift
index 70db19bfa33..9ab20842c25 100644
--- a/gensrc/thrift/PaloInternalService.thrift
+++ b/gensrc/thrift/PaloInternalService.thrift
@@ -392,18 +392,21 @@ struct TRuntimeFilterTargetParamsV2 {
}
struct TRuntimeFilterParams {
- // Runtime filter merge instance address
+ // Runtime filter merge instance address. Used if this filter has a remote
target
1: optional Types.TNetworkAddress runtime_filter_merge_addr
// deprecated
2: optional map<i32, list<TRuntimeFilterTargetParams>> rid_to_target_param
// Runtime filter ID to the runtime filter desc
+ // Used if this filter has a remote target
3: optional map<i32, PlanNodes.TRuntimeFilterDesc> rid_to_runtime_filter
// Number of Runtime filter producers
+ // Used if this filter has a remote target
4: optional map<i32, i32> runtime_filter_builder_num
+ // Used if this filter has a remote target
5: optional map<i32, list<TRuntimeFilterTargetParamsV2>>
rid_to_target_paramv2
}
diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift
index be920c2baf5..c7f8d247d7c 100644
--- a/gensrc/thrift/PlanNodes.thrift
+++ b/gensrc/thrift/PlanNodes.thrift
@@ -1252,7 +1252,7 @@ struct TRuntimeFilterDesc {
// The order of Expr in join predicate
3: required i32 expr_order
- // Map of target node id to the target expr
+ // Map of target node id to the target expr. Used by consumer
4: required map<Types.TPlanNodeId, Exprs.TExpr> planId_to_target_expr
// Indicates if the source join node of this filter is a broadcast or
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]